netty初步
2013-07-24 13:59
316 查看
netty是java的高性能socket框架,linux下基epoll,这里不对他多牛逼作分析,网上资料很多,这里针对一般socket的业务作个例子
几个基本概念:
channel类似于socket句柄的抽象
pipeline是每个socket里面的eventHandler的处理响应链
每个socket(channel)绑定一个pipeline,,每个pipeline绑定若干个handler,netty里面的handler,专门用来处理和业务有关的东西,handler有upHandler和downHandler,down用来处理发包,up用来处理收包,大概的示例图看这里
注意上面的123的顺序,很重要,在netty里面,处理顺序如图,对于up类的收包处理,最靠近收包层的顺序越靠前;对于down类的包处理,最靠近收包层的顺序越靠后
还有一些encoder和decoder,encoder用来在发包之前进行加密,decoder在收包以后进行解码,然后业务数据跳到事件处理流程。
下面具体上代码,版本是netty3.6
MessageClientHandler.java
MessageDecoder.java
MessageEncoder.java
用来测试的TestClientDownHandlerA.java
用来测试的TestClientDownHandlerB.java
MessageClientPipelineFactory.java
MessageClient.java
客户端代码完成
以下为服务端测试代码
MessageDecoder和Messagencoder照抄,这个都是一样的
MessageServerHandler.java
TestServerUpHandlerA.java
TestServerUpHandlerB.java
MessageServerPipelineFactory.java
MessageServer.java
通过测试,可以发现handler的处理顺序,和图上面的是一致的;还有可以参考下encoder和decoder的写法,改改,直接用于项目。
几个基本概念:
channel类似于socket句柄的抽象
pipeline是每个socket里面的eventHandler的处理响应链
每个socket(channel)绑定一个pipeline,,每个pipeline绑定若干个handler,netty里面的handler,专门用来处理和业务有关的东西,handler有upHandler和downHandler,down用来处理发包,up用来处理收包,大概的示例图看这里
注意上面的123的顺序,很重要,在netty里面,处理顺序如图,对于up类的收包处理,最靠近收包层的顺序越靠前;对于down类的包处理,最靠近收包层的顺序越靠后
还有一些encoder和decoder,encoder用来在发包之前进行加密,decoder在收包以后进行解码,然后业务数据跳到事件处理流程。
下面具体上代码,版本是netty3.6
MessageClientHandler.java
package com.netty.test.client; import java.util.logging.Level; import java.util.logging.Logger; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ChannelStateEvent; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; public class MessageClientHandler extends SimpleChannelUpstreamHandler { private static final Logger logger = Logger.getLogger( MessageClientHandler.class.getName()); @Override public void channelConnected( ChannelHandlerContext ctx, ChannelStateEvent e) { String message = "hello kafka0102"; e.getChannel().write(message); } @Override public void messageReceived( ChannelHandlerContext ctx, MessageEvent e) { // Send back the received message to the remote peer. System.err.println("client messageReceived send message "+e.getMessage()); try { Thread.sleep(1000*3); } catch (Exception ex) { ex.printStackTrace(); } e.getChannel().write(e.getMessage()); } @Override public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e) { // Close the connection when an exception is raised. logger.log( Level.WARNING, "Unexpected exception from downstream.", e.getCause()); e.getChannel().close(); } }
MessageDecoder.java
package com.netty.test.client; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.frame.FrameDecoder; public class MessageDecoder extends FrameDecoder { @Override protected Object decode( ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception { if (buffer.readableBytes() < 4) { return null;//(1) } int dataLength = buffer.getInt(buffer.readerIndex()); if (buffer.readableBytes() < dataLength + 4) { return null;//(2) } buffer.skipBytes(4);//(3) byte[] decoded = new byte[dataLength]; buffer.readBytes(decoded); String msg = new String(decoded);//(4) return msg; } }
MessageEncoder.java
package com.netty.test.client; import org.jboss.netty.buffer.ChannelBuffer; import org.jboss.netty.buffer.ChannelBuffers; import org.jboss.netty.channel.Channel; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.handler.codec.oneone.OneToOneEncoder; public class MessageEncoder extends OneToOneEncoder { @Override protected Object encode( ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception { if (!(msg instanceof String)) { return msg;//(1) } String res = (String)msg; byte[] data = res.getBytes(); int dataLength = data.length; ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2) buf.writeInt(dataLength); buf.writeBytes(data); return buf;//(3) } }
用来测试的TestClientDownHandlerA.java
package com.netty.test.client; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.SimpleChannelDownstreamHandler; public class TestClientDownHandlerA extends SimpleChannelDownstreamHandler { public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { System.err.println("test client Down handlerA "); super.handleDownstream(ctx, e); } }
用来测试的TestClientDownHandlerB.java
package com.netty.test.client; import org.jboss.netty.channel.ChannelEvent; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.SimpleChannelDownstreamHandler; public class TestClientDownHandlerB extends SimpleChannelDownstreamHandler { public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception { System.err.println("test client Down handlerB "); super.handleDownstream(ctx, e); } }
MessageClientPipelineFactory.java
package com.netty.test.client; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; import com.netty.test.client.MessageDecoder; import com.netty.test.client.MessageEncoder; public class MessageClientPipelineFactory implements ChannelPipelineFactory { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", new MessageDecoder()); pipeline.addLast("encoder", new MessageEncoder()); pipeline.addLast("handler", new MessageClientHandler()); pipeline.addFirst("testClientDownHandlerA", new TestClientDownHandlerA()); pipeline.addFirst("testClientDownHandlerB", new TestClientDownHandlerB()); return pipeline; } }
MessageClient.java
package com.netty.test.client; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ClientBootstrap; import org.jboss.netty.channel.ChannelFuture; import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; public class MessageClient { public static void main(String[] args) throws Exception { // Parse options. String host = "127.0.0.1"; int port = 8888; // Configure the client. ClientBootstrap bootstrap = new ClientBootstrap( new NioClientSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the event pipeline factory. bootstrap.setPipelineFactory(new MessageClientPipelineFactory()); // Start the connection attempt. ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port)); // Wait until the connection is closed or the connection attempt fails. future.getChannel().getCloseFuture().awaitUninterruptibly(); // Shut down thread pools to exit. bootstrap.releaseExternalResources(); } }
客户端代码完成
以下为服务端测试代码
MessageDecoder和Messagencoder照抄,这个都是一样的
MessageServerHandler.java
package com.netty.test.server; import java.util.logging.Level; import java.util.logging.Logger; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; public class MessageServerHandler extends SimpleChannelUpstreamHandler { private static final Logger logger = Logger.getLogger( MessageServerHandler.class.getName()); @Override public void messageReceived( ChannelHandlerContext ctx, MessageEvent e) { if (!(e.getMessage() instanceof String)) { return;//(1) } String msg = (String) e.getMessage(); System.err.println("got msg:"+msg); e.getChannel().write(msg);//(2) } @Override public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e) { logger.log( Level.WARNING, "Unexpected exception from downstream.", e.getCause()); e.getChannel().close(); } }
TestServerUpHandlerA.java
package com.netty.test.server; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; public class TestServerUpHandlerA extends SimpleChannelUpstreamHandler { @Override public void messageReceived( ChannelHandlerContext ctx, MessageEvent e) { // Send back the received message to the remote peer. System.err.println("server test upHandlerA get message "+e.getMessage()); try { super.messageReceived(ctx, e); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } @Override public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e) { e.getChannel().close(); } }
TestServerUpHandlerB.java
package com.netty.test.server; import org.jboss.netty.channel.ChannelHandlerContext; import org.jboss.netty.channel.ExceptionEvent; import org.jboss.netty.channel.MessageEvent; import org.jboss.netty.channel.SimpleChannelUpstreamHandler; public class TestServerUpHandlerB extends SimpleChannelUpstreamHandler { @Override public void messageReceived( ChannelHandlerContext ctx, MessageEvent e) { // Send back the received message to the remote peer. System.err.println("server test upHandlerB get message "+e.getMessage()); try { super.messageReceived(ctx, e); } catch (Exception e1) { // TODO Auto-generated catch block e1.printStackTrace(); } } @Override public void exceptionCaught( ChannelHandlerContext ctx, ExceptionEvent e) { e.getChannel().close(); } }
MessageServerPipelineFactory.java
package com.netty.test.server; import org.jboss.netty.channel.ChannelPipeline; import org.jboss.netty.channel.ChannelPipelineFactory; import org.jboss.netty.channel.Channels; public class MessageServerPipelineFactory implements ChannelPipelineFactory { public ChannelPipeline getPipeline() throws Exception { ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", new MessageDecoder()); pipeline.addLast("encoder", new MessageEncoder()); pipeline.addLast("handler", new MessageServerHandler()); // pipeline.addFirst("testServerUpHandlerA", new TestServerUpHandlerA()); // pipeline.addFirst("testServerUpHandlerB", new TestServerUpHandlerB()); return pipeline; } }
MessageServer.java
package com.netty.test.server; import java.net.InetSocketAddress; import java.util.concurrent.Executors; import org.jboss.netty.bootstrap.ServerBootstrap; import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory; public class MessageServer { public static void main(String[] args) throws Exception { // Configure the server. ServerBootstrap bootstrap = new ServerBootstrap( new NioServerSocketChannelFactory( Executors.newCachedThreadPool(), Executors.newCachedThreadPool())); // Set up the default event pipeline. bootstrap.setPipelineFactory(new MessageServerPipelineFactory()); // Bind and start to accept incoming connections. bootstrap.bind(new InetSocketAddress(8888)); } }
通过测试,可以发现handler的处理顺序,和图上面的是一致的;还有可以参考下encoder和decoder的写法,改改,直接用于项目。