您的位置:首页 > 编程语言 > Java开发

spring+netty 服务器

2015-09-06 17:40 459 查看
一、 java NIO简介

nio是java New IO的简称,在jdk1.4里提供的新api。Sun官方标榜的特性如下:

 为所有的原始类型提供(Buffer)缓存支持。

 字符集编码解码解决方案。

 Channel:一个新的原始I/O抽象。

 支持锁和内存映射文件的文件访问接口。

 提供多路(non-bloking)非阻塞式的高伸缩性网络I/O。

关于java NIO的实现部分不是本文讨论的重点,有兴趣的朋友可以访问JAVA夜无眠的博客JAVA NIO 实例。

二、 NIO框架简介

在Java社区,最知名的开源Java NIO框架要属Mina和Netty。实际上,Netty的作者原来就是Mina作者之一,所以可以想到,Netty和Mina在设计理念上会有很多共同点。而本文主要介绍的是使用netty搭建简单的游戏服务器,对于netty与mina的比较以及简单netty应用教程,将在其他文章中有所提及,敬请关注!

三、 netty游戏框架搭建

a) ServerBootstrap——netty框架的总入口

Java代码


/**
*  作者:chenpeng 
* E-mail:46731706@qq.com 
*  创建时间:2012-7-12 下午12:22:53 
*  类说明  netty game
*/  
public class ServerTest { 

 
 
public static
void main(String[] args) { 
  DOMConfigurator.configureAndWatch("config/log4j.xml"); 

  ApplicationContext factory = new FileSystemXmlApplicationContext( 

    new String[] { "config/propholder.xml" }); 

   
     ServerBootstrap bootstrap = new ServerBootstrap(  

             new NioServerSocketChannelFactory(  

                     Executors.newCachedThreadPool(),  
                     Executors.newCachedThreadPool()));  
     ServerPipelineFactory httpServerPipelineFactory=(ServerPipelineFactory)factory.getBean("serverPipelineFactory"); 

     bootstrap.setPipelineFactory(httpServerPipelineFactory);  

     //启动端口 8888 
     bootstrap.bind(new InetSocketAddress(8888));  

     System.out.print("8888  server is starting……"); 

 
      

 


/**
*  作者:chenpeng
* E-mail:46731706@qq.com
*  创建时间:2012-7-12 下午12:22:53
*  类说明  netty game
*/
public class ServerTest {

public static void main(String[] args) {
DOMConfigurator.configureAndWatch("config/log4j.xml");
ApplicationContext factory = new FileSystemXmlApplicationContext(
new String[] { "config/propholder.xml" });

ServerBootstrap bootstrap = new ServerBootstrap(
new NioServerSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool()));
ServerPipelineFactory httpServerPipelineFactory=(ServerPipelineFactory)factory.getBean("serverPipelineFactory");
bootstrap.setPipelineFactory(httpServerPipelineFactory);
//启动端口 8888
bootstrap.bind(new InetSocketAddress(8888));
System.out.print("8888  server is starting……");

}

}


b) ChannelPipeline

channelPipeline是一系列channelHandler的集合,他参照J2ee中的Intercepting Filter模式来实现的,让用户完全掌握如果在一个handler中处理事件,同时让pipeline里面的多个handler可以相互交互。

Java代码


import org.jboss.netty.channel.ChannelPipeline; 

import org.jboss.netty.channel.ChannelPipelineFactory; 

import org.jboss.netty.channel.Channels; 

 
import com.cp.netty.coder.Decoder; 

import com.cp.netty.coder.Encoder; 

 
/**
*   作者:chenpeng 
*   E-mail:46731706@qq.com 
*   创建时间:2012-7-12 上午11:28:56 
*   channelPipeline是一系列channelHandler的集合,他参照J2ee中的Intercepting Filter模式来实现的,

*   让用户完全掌握如果在一个handler中处理事件,同时让pipeline里面的多个handler可以相互交互

*/  
public class ServerPipelineFactory
implements ChannelPipelineFactory { 

    public ServerHandler serverHandler; 

 
    public ChannelPipeline getPipeline()
throws Exception { 
        ChannelPipeline pipeLine = Channels.pipeline(); 
        pipeLine.addLast("decoder",
new Decoder(Integer.MAX_VALUE, 0,
4)); 
        pipeLine.addLast("encoder",
new Encoder(4)); 

        pipeLine.addLast("handler", serverHandler); 

        return pipeLine; 
    } 
 
    public ServerHandler getServerHandler() { 

        return serverHandler; 

    } 
 
    public void setServerHandler(ServerHandler serverHandler) { 

        this.serverHandler = serverHandler; 

    } 
 


import org.jboss.netty.channel.ChannelPipeline;
import org.jboss.netty.channel.ChannelPipelineFactory;
import org.jboss.netty.channel.Channels;

import com.cp.netty.coder.Decoder;
import com.cp.netty.coder.Encoder;

/**
* 	作者:chenpeng
*	E-mail:46731706@qq.com
* 	创建时间:2012-7-12 上午11:28:56
* 	channelPipeline是一系列channelHandler的集合,他参照J2ee中的Intercepting Filter模式来实现的,
* 	让用户完全掌握如果在一个handler中处理事件,同时让pipeline里面的多个handler可以相互交互
*/
public class ServerPipelineFactory implements ChannelPipelineFactory {
public ServerHandler serverHandler;

public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeLine = Channels.pipeline();
pipeLine.addLast("decoder", new Decoder(Integer.MAX_VALUE, 0, 4));
pipeLine.addLast("encoder", new Encoder(4));
pipeLine.addLast("handler", serverHandler);
return pipeLine;
}

public ServerHandler getServerHandler() {
return serverHandler;
}

public void setServerHandler(ServerHandler serverHandler) {
this.serverHandler = serverHandler;
}

}


c) Decoder——消息解码器

Decoder解码器继承于FrameDecoder,FrameDecoder是Netty codec包中的辅助类,它是个ChannelUpstreamHandler,decode方法是FrameDecoder子类需要实现的。在本程序采用的是LengthFieldBasedFrameDecoder。LengthFieldBasedFrameDecoder是基于长度字段的解码器。如果协议格式类似“内容长度”+内容、“固定头”+“内容长度”+动态内容这样的格式,就可以使用该解码器 。至于其他类型的解码器,这里不再一一介绍。

Java代码


import org.jboss.netty.buffer.ChannelBuffer; 

import org.jboss.netty.channel.Channel; 

import org.jboss.netty.channel.ChannelHandlerContext; 

import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder; 

 
 
/**
*   作者:chenpeng 
*   E-mail:46731706@qq.com 
*   创建时间:2012-7-12 上午11:22:14 
*   协议解码器 
*/  
public class Decoder
extends LengthFieldBasedFrameDecoder { 

    // 第一个参数为信息最大长度,超过这个长度回报异常, 

    // 第二参数为长度属性的起始(偏移)位,我们的协议中长度是0到第3个字节,所以这里写0, 

    // 第三个参数为“长度属性”的长度,我们是4个字节,所以写4, 

    // 第四个参数为长度调节值,在总长被定义为包含包头长度时,修正信息长度, 

    // 第五个参数为跳过的字节数,根据需要我们跳过前4个字节,以便接收端直接接受到不含“长度属性”的内容。 

 
    public Decoder(int maxFrameLength,
int lengthFieldOffset, 
            int lengthFieldLength) { 

        super(maxFrameLength, lengthFieldOffset, lengthFieldLength); 

    } 
 
    @Override 
    protected Object decode(ChannelHandlerContext ctx, Channel channel, 

            ChannelBuffer buffer) throws Exception { 

        ChannelBuffer buffs = (ChannelBuffer)super.decode(ctx, channel, buffer);  

        return buffs; 
    } 
 


import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;

/**
* 	作者:chenpeng
*	E-mail:46731706@qq.com
* 	创建时间:2012-7-12 上午11:22:14
* 	协议解码器
*/
public class Decoder extends LengthFieldBasedFrameDecoder {
// 第一个参数为信息最大长度,超过这个长度回报异常,
// 第二参数为长度属性的起始(偏移)位,我们的协议中长度是0到第3个字节,所以这里写0,
// 第三个参数为“长度属性”的长度,我们是4个字节,所以写4,
// 第四个参数为长度调节值,在总长被定义为包含包头长度时,修正信息长度,
// 第五个参数为跳过的字节数,根据需要我们跳过前4个字节,以便接收端直接接受到不含“长度属性”的内容。

public Decoder(int maxFrameLength, int lengthFieldOffset,
int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
}

@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
ChannelBuffer buffs = (ChannelBuffer)super.decode(ctx, channel, buffer);
return buffs;
}

}


d) ServerHandler——消息分发器

在介绍这个类之前,先对几个概念进行简要说明:

1. Channel:channel 是负责数据读、写的对象。channel是双向的,既可以write 也可以read。而且在NIO中用户不应该直接从channel中读写数据,而是应该通过buffer,通过buffer再将数据读写到channel中。

一个channel 可以提供给用户下面几个信息

(1)channel的当前状态,比如open 还是closed

(2)ChannelConfig对象,表示channel的一些参数,比如bufferSize

(3)channel支持的所有i/o操作(比如read,write,connect.bind)

2. channelEvent:ChannelEvent广义的认为Channel相关的事件处理。他分为Upstream events和downstream events两大块。如果以服务器端为主体,那么client到server的数据传输过程是Upstream,而server到client的数据传输过程则是downstream;以客户端为主体的过程正好相反。一下主要介绍以服务器端为主体的开发。

3. 常用的Upstream events包括

a) messageReceived:信息被接受时 ---MessageEvent

b) exceptionCaught:产生异常时 ---ExceptionEvent

c) channelOpen:channel被开启时 ---ChannelStateEvent

d) channelClosed:channel被关闭时 ---ChannelStateEvent

e) channelBound:channel被开启并准备去连接但还未连接上的时候 ---ChannelStateEvent

f) channelUnbound:channel被开启不准备去连接时候 ---ChannelStateEvent

g) channelConnected:channel被连接上的时候 ---ChannelStateEvent

h) channelDisconnected:channel连接断开的时候 ---ChannelStateEvent

在本游戏架构里,ServerHandler扮演着创建线程、验证消息、分发消息的重要角色,程序如下:

Java代码


import java.util.concurrent.ConcurrentLinkedQueue; 

 
import org.apache.log4j.Logger; 

import org.jboss.netty.buffer.ChannelBuffer; 

import org.jboss.netty.channel.ChannelHandlerContext; 

import org.jboss.netty.channel.ChannelStateEvent; 

import org.jboss.netty.channel.ExceptionEvent; 

import org.jboss.netty.channel.MessageEvent; 

import org.jboss.netty.channel.SimpleChannelUpstreamHandler; 

 
import com.cp.game.HandlerDispatcher; 

import com.cp.game.domain.MessageQueue; 

import com.cp.netty.domain.GameRequest; 

 
/**
*   作者:chenpeng 
*   E-mail:46731706@qq.com 
*   创建时间:2012-7-12 下午12:02:52 
*   游戏协议接收分发器
*/  
public class ServerHandler
extends SimpleChannelUpstreamHandler { 

    public Logger log = Logger.getLogger(this.getClass()); 

    public static HandlerDispatcher handlerDispatcher; 

 
 
    public void init() { 

        new Thread(handlerDispatcher).start(); 

    } 
 
     
 
    /* (non-Javadoc)
     * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#channelConnected(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)

     *  建立一个新channel
     */ 
    @Override 
    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) 

            throws Exception { 

        log.debug("进来一个channel:" + ctx.getChannel().getId()); 

        MessageQueue messageQueue = new MessageQueue( 

                new ConcurrentLinkedQueue<GameRequest>()); 

        handlerDispatcher.addMessageQueue(ctx.getChannel().getId(), messageQueue); 

 
    } 
 
    /* (non-Javadoc)
     * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#channelDisconnected(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)

     *  玩家主动关闭channel
     */ 
    @Override 
    public void channelDisconnected(ChannelHandlerContext ctx, 

            ChannelStateEvent e) throws Exception { 

        log.error("关掉一个channel:" + ctx.getChannel().getId()); 

        handlerDispatcher.removeMessageQueue(e.getChannel().getId().toString()); 

        e.getChannel().close(); 
    } 
 
    /* (non-Javadoc)
     * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#exceptionCaught(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ExceptionEvent)

     *  玩家被动关闭channel
     */ 
    @Override 
    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) 

            throws Exception { 

        log.error("出异常啦……" + ctx.getChannel().getId()); 

        e.getCause().printStackTrace(); 
        handlerDispatcher.removeMessageQueue(e.getChannel().getId().toString()); 

        e.getChannel().close(); 
    } 
 
    /* (non-Javadoc)
     * @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.MessageEvent)

     *  消息接收处理器
     */ 
    @Override 
    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) 

            throws Exception { 

        ChannelBuffer buffs = (ChannelBuffer) e.getMessage(); 
        buffs.skipBytes(4);// 越过dataLength的字节 

        byte[] decoded = new
byte[buffs.readableBytes()]; 
        buffs.readBytes(decoded); 
        GameRequest gameRequest = new GameRequest(e.getChannel(), decoded); 

 
        // 通知回调协议 
        handlerDispatcher.addMessage(gameRequest); 
    } 
 
    public HandlerDispatcher getHandlerDispatcher() { 

        return handlerDispatcher; 

    } 
 
    public void setHandlerDispatcher(HandlerDispatcher handlerDispatcher) { 

        ServerHandler.handlerDispatcher = handlerDispatcher; 
    } 
 


import java.util.concurrent.ConcurrentLinkedQueue;

import org.apache.log4j.Logger;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
import org.jboss.netty.channel.SimpleChannelUpstreamHandler;

import com.cp.game.HandlerDispatcher;
import com.cp.game.domain.MessageQueue;
import com.cp.netty.domain.GameRequest;

/**
* 	作者:chenpeng
*	E-mail:46731706@qq.com
* 	创建时间:2012-7-12 下午12:02:52
* 	游戏协议接收分发器
*/
public class ServerHandler extends SimpleChannelUpstreamHandler {
public Logger log = Logger.getLogger(this.getClass());
public static HandlerDispatcher handlerDispatcher;

public void init() {
new Thread(handlerDispatcher).start();
}

/* (non-Javadoc)
* @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#channelConnected(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)
*	建立一个新channel
*/
@Override
public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
log.debug("进来一个channel:" + ctx.getChannel().getId());
MessageQueue messageQueue = new MessageQueue(
new ConcurrentLinkedQueue<GameRequest>());
handlerDispatcher.addMessageQueue(ctx.getChannel().getId(), messageQueue);

}

/* (non-Javadoc)
* @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#channelDisconnected(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ChannelStateEvent)
*	玩家主动关闭channel
*/
@Override
public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
log.error("关掉一个channel:" + ctx.getChannel().getId());
handlerDispatcher.removeMessageQueue(e.getChannel().getId().toString());
e.getChannel().close();
}

/* (non-Javadoc)
* @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#exceptionCaught(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.ExceptionEvent)
*	玩家被动关闭channel
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
log.error("出异常啦……" + ctx.getChannel().getId());
e.getCause().printStackTrace();
handlerDispatcher.removeMessageQueue(e.getChannel().getId().toString());
e.getChannel().close();
}

/* (non-Javadoc)
* @see org.jboss.netty.channel.SimpleChannelUpstreamHandler#messageReceived(org.jboss.netty.channel.ChannelHandlerContext, org.jboss.netty.channel.MessageEvent)
*	消息接收处理器
*/
@Override
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
ChannelBuffer buffs = (ChannelBuffer) e.getMessage();
buffs.skipBytes(4);// 越过dataLength的字节
byte[] decoded = new byte[buffs.readableBytes()];
buffs.readBytes(decoded);
GameRequest gameRequest = new GameRequest(e.getChannel(), decoded);

// 通知回调协议
handlerDispatcher.addMessage(gameRequest);
}

public HandlerDispatcher getHandlerDispatcher() {
return handlerDispatcher;
}

public void setHandlerDispatcher(HandlerDispatcher handlerDispatcher) {
ServerHandler.handlerDispatcher = handlerDispatcher;
}

}


需要注意的是:HandlerDispatcher是一个多线程处理器,用于处理游戏逻辑请求。这部分功能可根据用户的不同需求进行定制。

e) Encoder——消息编码器

消息编码器主要完成的是对游戏逻辑处理器返回的数据进行编码,组合成符合客户端规范的消息格式并发送。

Java代码


import org.jboss.netty.buffer.ChannelBuffer; 

import org.jboss.netty.buffer.ChannelBuffers; 

import org.jboss.netty.channel.Channel; 

import org.jboss.netty.channel.ChannelHandlerContext; 

import org.jboss.netty.handler.codec.frame.LengthFieldPrepender; 

 
import com.cp.netty.domain.GameResponse; 

 
/**
*   作者:chenpeng 
*   E-mail:46731706@qq.com 
*   创建时间:2012-7-12 上午11:43:11 
*   类说明 
*/  
public class Encoder
extends LengthFieldPrepender { 

 
    public Encoder(int lengthFieldLength) { 

        super(lengthFieldLength); 

    } 
 
    @Override 
    protected Object encode(ChannelHandlerContext cxt, Channel channel, 

            Object msg) throws Exception { 

         
        ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(channel.getConfig().getBufferFactory()); 

        GameResponse response = (GameResponse) msg; 
        buffer.writeInt(response.getRtMessage().length+20); 

        buffer.writeInt(response.getCommondId()); 
        buffer.writeInt(response.getPlayerId()); 
        buffer.writeInt(response.getCommandType()); 
        buffer.writeLong(response.getTime()); 
        System.out.println("send message "+response.getCommondId()); 

        buffer.writeBytes(response.getRtObj().getBytesM()); 
        return buffer; 
 
    } 
 


import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;

import com.cp.netty.domain.GameResponse;

/**
* 	作者:chenpeng
*	E-mail:46731706@qq.com
* 	创建时间:2012-7-12 上午11:43:11
* 	类说明
*/
public class Encoder extends LengthFieldPrepender {

public Encoder(int lengthFieldLength) {
super(lengthFieldLength);
}

@Override
protected Object encode(ChannelHandlerContext cxt, Channel channel,
Object msg) throws Exception {

ChannelBuffer buffer = ChannelBuffers.dynamicBuffer(channel.getConfig().getBufferFactory());
GameResponse response = (GameResponse) msg;
buffer.writeInt(response.getRtMessage().length+20);
buffer.writeInt(response.getCommondId());
buffer.writeInt(response.getPlayerId());
buffer.writeInt(response.getCommandType());
buffer.writeLong(response.getTime());
System.out.println("send message "+response.getCommondId());
buffer.writeBytes(response.getRtObj().getBytesM());
return buffer;

}

}


Java代码


<?xml version="1.0" encoding="UTF-8"?> 

<beans xmlns="http://www.springframework.org/schema/beans" 

    xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 

    xmlns:aop="http://www.springframework.org/schema/aop" 

    xmlns:tx="http://www.springframework.org/schema/tx" 

    xsi:schemaLocation="  
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.5.xsd
 
            http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop-2.5.xsd
 
 
            http://www.springframework.org/schema/tx http://www.springframework.org/schema/tx/spring-tx-2.5.xsd"> 

 
    <bean id="serverPipelineFactory" 

        class="com.cp.netty.ServerPipelineFactory" 

        scope="prototype"> 
        <property name="serverHandler" ref="appHandler"></property> 

    </bean> 
 
    <bean id="appHandler" class="com.cp.netty.ServerHandler" 

        init-method="init"> 
        <property name="handlerDispatcher" ref="handlerDispatcher" /> 

    </bean> 
    <bean id="handlerDispatcher" 

        class="com.cp.game.HandlerDispatcher" init-method="init" 

        destroy-method="stop"> 

        <property name="messageExecutor"> 

            <bean class="com.cp.netty.domain.FiexThreadPoolExecutor" 

                destroy-method="shutdown"> 

                <constructor-arg 
                    value="${app.dispatcher.pool.corePoolSize}" /> 

                <constructor-arg 
                    value="${app.dispatcher.pool.maximumPoolSize}" /> 

                <constructor-arg 
                    value="${app.dispatcher.pool.keepAliveSecond}" /> 

            </bean> 
        </property> 
        <property name="sleepTime" value="${app.dispatcher.sleepTime}" /> 

        <property name="handlerMap" ref="serverHandlerMapping" /> 

    </bean> 
 
    <bean id="serverMainController"
class="com.cp.game.ServerMainHandler" 

        abstract="true"> 

    </bean> 
     
       <bean id="serverHandlerMapping"
class="java.util.HashMap"> 

        <constructor-arg> 
            <map> 
                <!-- 测试协议 --> 
                <entry key="1000"> 

                    <bean 
                        class="com.cp.game.handler.common.InitHandler" 

                        parent="serverMainController"> 

                    </bean> 
                </entry> 
            </map> 
        </constructor-arg> 
    </bean> 
 
</beans> 
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: