您的位置:首页 > 理论基础 > 计算机网络

CTS 网络通信模块 - Netty Handler的使用

2016-01-18 14:44 337 查看
最近在开发一个软件测试服务平台,底层网络通信部分以前是使用JMX,现在换成TCP通信,使用Netty4,监听在一个指定端口上,支持Http/Telnet访问,模块之间使用ProtoBuf进行信息交互。这里记录了一下在开发过程中涉及到的Netty Handler相关的一些技术点。

Based on Nicholas's blog at http://www.znetdevelopment.com/blogs/2009/04/21/netty-using-handlers/ and made some minor changes for Netty 4.

Before I start, let me say that I am in no way affiliated with JBoss or Netty (although I may choose to submit future code as part of the open source community). Therefore, all comments, code samples, etc are mine or a derivative of examples provided by Netty.
I apologize up front for any possible discrepancies or errors (please let me know though, so I can fix and update them). I believe this documentation to be completely accurate, though.

In Netty, the main injection point into your code or business logic is through the use of handlers. Handlers follow the interceptor pattern similar to filters in a traditional servlet-based web application. For more information, see Netty documentation. Handlers
provide an event model that allows an application to monitor incoming/outgoing data, modify the data, convert the data, act upon the data, etc. In essence, they allow you to completely abstract separate concerns into separate classes without needing to have
class A know about class B. As an example, you can add a log handler to listen for incoming data and print it out. This handler can be added and removed without modifying any other class. Handlers, however, are more often used to build protocol stacks in
which one filter acts as a codec to decode/encode a stream of bytes into higher level objects which are then acted upon by another handler such as a business logic handler.

Channel handlers in Netty start from a channel pipeline. Handlers are added to the pipeline in a specific order. The order determines how and when the handlers are invoked. Thus, if you have a handler that depends on another handler (a codec for instance),
you need to make sure that the codec handler comes before in the pipeline. **As data enters the system, often times asynchronously, the data is wrapped in a channel buffer object in Netty that provides zero copy transparency and higher level access methods.
The object is then flowed from the first handler downstream to the last handler (unless a handler chooses to break the flow or an exception is thrown). If any write events occur (typically by the last handler), then the data flows upstream from the last
handler to the first handler.** A better diagram of this is available in the channel pipeline API documentation. This is why order is important to ensure that the handlers get invoked at the proper time.

Channel handlers come in two flavors with one of two (or both) responsibilities. First, they are either stateful or stateless. Second, they are either upstream, downstream, or both. For stateness, a handler can choose to maintain a state inside its class
or choose to do all its logic in its event callback method(s). This choice is critical to the application, however. **A new pipeline is created for each and every incoming connection**. Thus, if you add a handler to a pipeline factory and re-use that same
handler for each pipeline instance, then multiple asynchronous worker threads can be simultaneously accessing data in the handler. If you maintain state in your handler, this can quickly become an issue. The default pipeline factory in Netty actually works
this way by re-using handlers. The opposite side of this is that if you choose to create a new handler every time, you reduce performance and increase memory/garbage collection. Thus, you need to decide right away how each and every handler will be used.
If a handler is stateless, has no instance variables, and can act as a singleton, then you should re-use that particular handler within the pipeline factory. If a handler is stateful, such as a frame or codec decoder, then you should create a new instance
with every pipeline. Note that you could also choose to use a singleton instance of a stateful handler and synchronize the methods. However, I would be hesitant to try that method as you will reduce overall throughput when thousands of connections all try
to access the same data. **In most situations, especially on server class machines, you are better off creating a new handler with every connection.** For me personally, I try to use stateless handlers as much as possible and generally only use stateful
handlers for frame decoders to properly construct a frame from the fragmented TCP stream (ie: a single sent frame from a client can result in multiple frames and requests on the server that must be reconstructed as a single frame…frame decoders in Netty help
to easily support this as we will see later).

The second part of handlers is the responsibility: upstream, downstream, or both. Upstream handlers are used to process incoming data whereas downstream handlers are used to process outgoing data. Upstream handlers offer functionality such as decryption,
decompression, framing, decoding/codecs, and business logic. Generally, business logic is the last handler and ultimately responds to a request by writing a response. The response then flows downstream through handlers such as encoding, compression, and
encryption. Note that often times the downstream handlers are a mirror of the upstream handlers. For this reason, many handlers choose to implement both the upstream and downstream functionality in a single class. The choice of separate or combined classes
is entirely your decision and depends on how much abstraction of code you want. I prefer to keep everything separate and simple to more easily change implementations. For example, you may choose to keep a particular decoder functionality but upgrade the
encoder to a better performing implementation. By separating the classes, you have the freedom to switch the encoder without touching the decoder. This results in fewer regressions.

In order to use handlers, you have to use the default pipeline or create a custom pipeline factory. **Remember that the default pipeline re-uses existing handlers and treats them as singletons.** Thus, if you have any stateful handlers, then you must create
a custom pipeline factory. Creating a custom pipeline factory is extremely easy however, as shown below.

public class MyPipelineFactory implements
ChannelPipelineFactory
{
// stateless, singleton handler instances...re-used across connections
private static final ChannelHandler STR_ENCODER = new StringEncoder();
private static final ChannelHandler STR_DECODER = new StringDecoder();
private static final ChannelHandler APP_HANDLER = new MyAppHandler();
private static final ChannelHandler LOG_HANDLER = new LogHandler();

public ChannelPipeline getPipeline() throws Exception
{
// create default pipeline from static method
ChannelPipeline pipeline = Channels.pipeline();

// add logger to print incoming and outgoing data
// this is both an upstream/downstream handler
pipeline.addLast("logger", LOGGER);

// add delimiter-based frame decoder...this is stateful by maintaining
// a buffer state of defragmented packets...thus, we create a new one each time
pipeline.addLast("framer", new DelimiterBasedFrameDecoder(...));

// add string encoder (downstream) /decoders (upstream)
pipeline.addLast("decoder", STR_DECODER);
pipeline.addLast("encoder", STR_ENCODER);

// add business logic (upstream)
pipeline.addLast("handler", HANDLER);

// return pipeline instance
return pipeline;
}
}


You then register this factory in your application with Netty via:

ServerBootstrap bootstrap = new ServerBootstrap(...);

bootstrap.setPipelineFactory(new MyPipelineFactory());

*In Netty 4, you can do this by below java snippet:*

serverBootStrap....childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline p = ch.pipeline();
p.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());
p.addLast("protobufDecoder",PROTOBUF_DECODER);
p.addLast("frameEncoder", PROTOBUF_FRAME_ENCODER);
p.addLast("protobufEncoder", PROTOBUF_ENCODER);
p.addLast("bizHandler", new AgentCTSServerHandler());
}
});
private static final ProtobufEncoder PROTOBUF_ENCODER = new ProtobufEncoder();
private static final ProtobufDecoder PROTOBUF_DECODER = new ProtobufDecoder(RequestProtos.Request.getDefaultInstance());
private static final ProtobufVarint32LengthFieldPrepender PROTOBUF_FRAME_ENCODER = new ProtobufVarint32LengthFieldPrepender();


You can now properly handle incoming/outgoing data with any number of stateless or stateful handlers. My best practice is to always create a custom pipeline factory in order to fully control the handler instances. In my next post, I will actually show how
powerful pipeline factories can be by using pools to conserve memory with stateful handlers.

Now that we know how to construct pipelines and add handlers to it, how do we actually create usable handlers? First, you must decide what type of handler (upstream/downstream) and what its purpose will be. Netty comes with several built-in handlers and built-in
handler helper classes, so always look at the API documentation first before rolling your own. In general, handlers fall into a common set of categories that I will get to in a second. But first, let’s look at a very basic handler.

@ChannelPipelineCoverage("all")
public class MyAppHandler extends SimpleChannelHandler
{
public void messageReceived(ChannelHandlerContext ctx, MessageEvent <code>event</code>)
throws Exception
{
if (e.getMessage() instanceof MyPojo)
{
MyPojo pojo = (MyPojo) e.getMessage();
event.getChannel().write(pojo.getResponse());
}
}
}


This is an overly simplified example. For more information, consult the Netty Guide. The first thing to notice is the annotation. This annotation is a little confusing when first getting started with Netty. The annotation is used to basically describe this
handler as being stateful (a value of “one”) or stateless (a value of “all”). It provides the level of coverage that a pipeline can use (all or one). However, this annotation is at the moment purely for descriptive and documentational purposes. Netty will
look for the annotation at runtime to ensure you properly described the handler, but it does not enforce stateful or statelessness on the handler. That is the purpose of the pipeline factory that we discussed earlier. In a future post, I will show how you
can extend this annotation and use a custom pipeline factory to auto-process and control handlers.

The following are common categories or groupings of handlers and how to generally process them. Note that these are just samples of use cases.

Framing

Framing handlers are used to join one or more fragmented requests into a single request that other handlers can process. This is generally near the top of the stack and allows all other handlers to process a given request as a single request even though it
may actually be several requests. Netty provides both built-in framers as well as a helper parent class. The most popular built-in framer is the delimiter-based framer that searches for a particular delimiter in a message and passes each delimited section
up the stream.

pipeline.addLast
(
"framer",
new DelimiterBasedFrameDecoder
(
// 512 is the maximum estimated size of a delimited packet
// Delimiters is a static class that provides several built-in delimiters
512, Delimiters.lineDelimiter()
)
);


The DelimiterBasedFrameDecoder is a subclass of the FrameDecoder implementation. FrameDecoder is the main parent class you normally use to build custom framers. For example, you may want to build a framer for a binary protocol where the first 2 bytes represent
the length of the message. You would implement such functionality via (note: this is just for example purposes as Netty provides a built-in version):

public class BinaryDecoder extends FrameDecoder
{
@Override
protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer)
throws Exception
{
// wait until the length prefix is available.
if (buffer.readableBytes() < 2) {
// return null to inform frame decoder that frame is not yet complete and
// to continue reading data
return null;
}
// read length field
int dataLength = buffer.getShort(buffer.readerIndex());
// wait until the whole data is available.
if (buffer.readableBytes() < dataLength + 2) {
return null;
}
// skip the length field because we know it already.
buffer.skipBytes(2);
// forward remaining buffer to higher up handlers
return buffer;
}
}


Note that framing handlers are always stateful. As such the parent FrameDecoder class already contains the proper annotation, so you do not need to re-define it.

Encoders/Decoders

Encoders and decoders are used to take a POJO object and convert to and from a ChannelBuffer. These handlers allow higher up handlers such as business logic handlers to use POJO objects without having to rely on channel buffers. Netty includes some pre-built
encoders, such as StringEncoder and StringDecoder. These handlers are generally stateless as they only need to take a complete set of data and convert to another POJO class. Since framers are used prior, we can be assured that the data is a complete set.
Other examples of encoders/decoders are Java serialization (built-in), text-based to custom POJO, binary to custom POJO, etc.

Encoders can be created by extending the OneToOneEncoder class and overriding the encode method. Decoders can be created by extending the OneToOneDecoder class and overriding the decode method. If you wish to provide both functionality, use the following
as an example:

@ChannelPipelineCoverage("all")
public class MultiHandler
implements ChannelUpstreamHandler, ChannelDownstreamHandler
{
public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent ce)
throws Exception
{
if (!(ce instanceof MessageEvent))
{
ctx.sendUpstream(ce);
return;
}

MessageEvent me = (MessageEvent) ce;
if (!(me.getMessage() instanceof ChannelBuffer))
{
ctx.sendUpstream(ce);
return;
}

Channels.fireMessageReceived(ctx, decode(ctx, (ChannelBuffer) me.getMessage()));
}

public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent ce)
throws Exception
{
if (!(ce instanceof MessageEvent))
{
ctx.sendDownstream(ce);
return;
}

MessageEvent me = (MessageEvent) ce;
if (!(me.getMessage() instanceof MyPojo))
{
ctx.sendUpstream(ce);
return;
}

Channels.write(ctx, me.getFuture(), encode(ctx, (MyPojo) me.getMessage()));
}

protected MyPojo decode(ChannelHandlerContext ctx, ChannelBuffer buffer)
throws Exception
{
// decode channel buffer into a POJO and return the POJO
}

protected ChannelBuffer encode(ChannelHandlerContext ctx, MyPojo pojo)
throws Exception
{
// encode the POJO into a ChannelBuffer
// ie:  ChannelBuffer buffer = ChannelBuffers.buffer(512);
}
}


One thing to note with encoders when creating buffers is to try to create buffers with estimated lengths to avoid having to dynamically create extra data multiple times. It is often times faster to create one 512-byte buffer than 4 128-byte buffers. To do
so, just use the Channels.buffer(int) static method taking an integer argument as the estimated length.

Manipulating Handlers

Manipulating handlers are used to take a given buffer, manipulate it, and return a different buffer. This is similar to framers and decoders, but generally serve another purpose such as compression or encryption. These handlers can choose to either extend
SimpleChannelHandler and override messageReceived and/or writeRequested or can implement the ChannelUpstreamHandler and/or ChannelDownstreamHandler and implement the associated callback. In either case, you should manipulate the data as necessary, and then
pass the message on by creating a new event. The following is a base example:

public void handleUpstream(ChannelHandlerContext ctx, ChannelEvent ce)
{
if (!(ce instanceof MessageEvent))
{
ctx.sendUpstream(ce);
return;
}

MessageEvent me = (MessageEvent) ce;

// process me.getMessage as either ChannelBuffer, POJO, etc
// convert to a new object or ChannelBuffer

// send new message
fireMessageReceived(ctx, newObject);
}

public void handleDownstream(ChannelHandlerContext ctx, ChannelEvent ce)
{
if (!(ce instanceof MessageEvent))
{
ctx.sendDownstream(ce);
return;
}

MessageEvent me = (MessageEvent) ce;

// process me.getMessage as either ChannelBuffer, POJO, etc
// convert to a new object or ChannelBuffer

// write new message
write(ctx, me.getFuture(), newObject);
}


Business Logic Handlers

The final example/use case of handlers is business logic handlers. These are generally the last handler in the pipeline and perform the actual business logic. Because of the prior framers and decoders, these handlers can use a POJO and process accordingly.
As a result, they tend to write data in response to a message. Again, because of the prior encoders, the handler can write a POJO and know that it will get turned into a buffer automatically. Handlers generally extend SimpleChannelHandler such as:

@ChannelPipelineCoverage("one")
public class MyHandler extends SimpleChannelHandler
{
public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception
{
if (e.getMessage() instanceof MyPojo)
{
MyPojo pojo = (MyPojo) e.getMessage();
e.getChannel().write(pojo.getPojoResult());
}
}
}


These handlers will generally be the most widely used for a given application and contain the most logic. It is best to abstract the actual business logic into separate classes so that they are re-usable and contain no dependency on Netty. In general, they
should just take a POJO and output a POJO. The business handler is then a simple class that gets the POJO from the message event, invokes an independent business operation, gets the result, and writes the result to the buffer.

We now have the tools necessary to create custom handlers and the custom pipeline factory that uses the handlers. These handlers are generally simple to write and often times already exist within Netty. As a result, you can concentrate on code that really
matters: the business logic. If ever you need to change an implementation, just replace the handler in the pipeline and everything else remains untouched resulting in a maintainable and efficient system and all at little cost to performance and scalability.
However, remember that Netty is meant as a scalable server and so handlers should try to be as optimized and simple as possible. Leave the complexities to business logic.

If a custom frame decoder is required, then one needs to be careful when implementing one with {@link ByteToMessageDecoder}.
Ensure there are enough bytes in the buffer for a complete frame by checking {@link ByteBuf#readableBytes()}. If there are not enough bytes for a complete frame, return without modifying the reader index to allow more bytes to arrive.

To check for complete frames without modifying the reader index, use methods like {@link ByteBuf#getInt(int)}.One <strong>MUST</strong> use the reader index when using methods like {@link ByteBuf#getInt(int)}. For example calling <tt>in.getInt(0)</tt> is
assuming the frame starts at the beginning of the buffer, which is not always the case. Use <tt>in.getInt(in.readerIndex())</tt> instead.
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: