您的位置:首页 > 其它

netty初步

2013-07-24 13:59 316 查看
netty是java的高性能socket框架,linux下基epoll,这里不对他多牛逼作分析,网上资料很多,这里针对一般socket的业务作个例子

几个基本概念:

  channel类似于socket句柄的抽象
  pipeline是每个socket里面的eventHandler的处理响应链

每个socket(channel)绑定一个pipeline,,每个pipeline绑定若干个handler,netty里面的handler,专门用来处理和业务有关的东西,handler有upHandler和downHandler,down用来处理发包,up用来处理收包,大概的示例图看这里



注意上面的123的顺序,很重要,在netty里面,处理顺序如图,对于up类的收包处理,最靠近收包层的顺序越靠前;对于down类的包处理,最靠近收包层的顺序越靠后

还有一些encoder和decoder,encoder用来在发包之前进行加密,decoder在收包以后进行解码,然后业务数据跳到事件处理流程。

下面具体上代码,版本是netty3.6

MessageClientHandler.java

package com.netty.test.client;

import java.util.logging.Level;
import java.util.logging.Logger;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

public class MessageClientHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(
MessageClientHandler.class.getName());

@Override
public void channelConnected(
ChannelHandlerContext ctx, ChannelStateEvent e) {
String message = "hello kafka0102";
e.getChannel().write(message);
}

@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
System.err.println("client messageReceived send message "+e.getMessage());
try {
Thread.sleep(1000*3);
} catch (Exception ex) {
ex.printStackTrace();
}
e.getChannel().write(e.getMessage());
}

@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
// Close the connection when an exception is raised.
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}


MessageDecoder.java

package com.netty.test.client;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.FrameDecoder;

public class MessageDecoder extends FrameDecoder {

@Override
protected Object decode(
ChannelHandlerContext ctx, Channel channel, ChannelBuffer buffer) throws Exception {
if (buffer.readableBytes() < 4) {
return null;//(1)
}
int dataLength = buffer.getInt(buffer.readerIndex());
if (buffer.readableBytes() < dataLength + 4) {
return null;//(2)
}

buffer.skipBytes(4);//(3)
byte[] decoded = new byte[dataLength];
buffer.readBytes(decoded);
String msg = new String(decoded);//(4)
return msg;
}
}


MessageEncoder.java

package com.netty.test.client;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.oneone.OneToOneEncoder;

public class MessageEncoder extends OneToOneEncoder {

@Override
protected Object encode(
ChannelHandlerContext ctx, Channel channel, Object msg) throws Exception {
if (!(msg instanceof String)) {
return msg;//(1)
}

String res = (String)msg;
byte[] data = res.getBytes();
int dataLength = data.length;
ChannelBuffer buf = ChannelBuffers.dynamicBuffer();//(2)
buf.writeInt(dataLength);
buf.writeBytes(data);
return buf;//(3)
}
}


用来测试的TestClientDownHandlerA.java

package com.netty.test.client;

import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;

public class TestClientDownHandlerA extends SimpleChannelDownstreamHandler {

public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
System.err.println("test client Down handlerA ");

super.handleDownstream(ctx, e);
}
}


用来测试的TestClientDownHandlerB.java

package com.netty.test.client;

import org.jboss.netty.channel.ChannelEvent;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.SimpleChannelDownstreamHandler;

public class TestClientDownHandlerB extends SimpleChannelDownstreamHandler {

public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent e)
throws Exception {
System.err.println("test client Down handlerB ");

super.handleDownstream(ctx, e);
}
}


MessageClientPipelineFactory.java

package com.netty.test.client;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;

import com.netty.test.client.MessageDecoder;
import com.netty.test.client.MessageEncoder;

public class MessageClientPipelineFactory implements ChannelPipelineFactory {

public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();

pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());
pipeline.addLast("handler", new MessageClientHandler());

pipeline.addFirst("testClientDownHandlerA", new TestClientDownHandlerA());
pipeline.addFirst("testClientDownHandlerB", new TestClientDownHandlerB());

return pipeline;
}
}


MessageClient.java

package com.netty.test.client;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ClientBootstrap;
import org.jboss.netty.channel.ChannelFuture;
import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;

public class MessageClient {

public static void main(String[] args) throws Exception {
// Parse options.
String host = "127.0.0.1";
int port = 8888;
// Configure the client.
ClientBootstrap bootstrap = new ClientBootstrap(
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
// Set up the event pipeline factory.
bootstrap.setPipelineFactory(new MessageClientPipelineFactory());
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection is closed or the connection attempt fails.
future.getChannel().getCloseFuture().awaitUninterruptibly();
// Shut down thread pools to exit.
bootstrap.releaseExternalResources();
}
}


客户端代码完成

以下为服务端测试代码

MessageDecoder和Messagencoder照抄,这个都是一样的

MessageServerHandler.java

package com.netty.test.server;
import java.util.logging.Level;
import java.util.logging.Logger;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

public class MessageServerHandler extends SimpleChannelUpstreamHandler {

private static final Logger logger = Logger.getLogger(
MessageServerHandler.class.getName());

@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
if (!(e.getMessage() instanceof String)) {
return;//(1)
}
String msg = (String) e.getMessage();
System.err.println("got msg:"+msg);
e.getChannel().write(msg);//(2)
}

@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {
logger.log(
Level.WARNING,
"Unexpected exception from downstream.",
e.getCause());
e.getChannel().close();
}
}


TestServerUpHandlerA.java

package com.netty.test.server;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

public class TestServerUpHandlerA extends SimpleChannelUpstreamHandler {

@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
System.err.println("server test upHandlerA get message "+e.getMessage());

try {
super.messageReceived(ctx, e);
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}

@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {

e.getChannel().close();
}
}


TestServerUpHandlerB.java

package com.netty.test.server;

import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

public class TestServerUpHandlerB extends SimpleChannelUpstreamHandler {

@Override
public void messageReceived(
ChannelHandlerContext ctx, MessageEvent e) {
// Send back the received message to the remote peer.
System.err.println("server test upHandlerB get message "+e.getMessage());

try {
super.messageReceived(ctx, e);
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
}

@Override
public void exceptionCaught(
ChannelHandlerContext ctx, ExceptionEvent e) {

e.getChannel().close();
}
}


MessageServerPipelineFactory.java

package com.netty.test.server;

import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;

public class MessageServerPipelineFactory implements
ChannelPipelineFactory {

public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = Channels.pipeline();

pipeline.addLast("decoder", new MessageDecoder());
pipeline.addLast("encoder", new MessageEncoder());
pipeline.addLast("handler", new MessageServerHandler());

//        pipeline.addFirst("testServerUpHandlerA", new TestServerUpHandlerA());
//        pipeline.addFirst("testServerUpHandlerB", new TestServerUpHandlerB());

return pipeline;
}
}


MessageServer.java

package com.netty.test.server;

import java.net.InetSocketAddress;
import java.util.concurrent.Executors;
import org.jboss.netty.bootstrap.ServerBootstrap;
import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;

public class MessageServer {

public static void main(String[] args) throws Exception {
// Configure the server.
ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));

// Set up the default event pipeline.
bootstrap.setPipelineFactory(new MessageServerPipelineFactory());

// Bind and start to accept incoming connections.
bootstrap.bind(new InetSocketAddress(8888));
}
}


通过测试,可以发现handler的处理顺序,和图上面的是一致的;还有可以参考下encoder和decoder的写法,改改,直接用于项目。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: