您的位置:首页 > 其它

netty:使用ChannelDuplexHandler 来接收、下发数据

2018-01-05 11:43 323 查看

项目背景

因为需求是和硬件对接,需要定时对硬件设备进行检查,因此决定使用netty作为通信中间件。使用netty的ChannelDuplexHandler 来接收、下发硬件数据。

硬件通过TCP长连接向服务端发送指令,服务端使用netty监听固定端口,接收并处理指令。

硬件发送的是16进制字节流,使用netty的ByteArrayDecoder、ByteArrayEncoder 对数据进行编码处理。

因为要处理TCP粘包的问题,所以同硬件约定在传送数据的末尾加上两对\r\n用于区分。因此,netty中我用了DelimiterBasedFrameDecoder 对数据做截取。

Netty版本:netty-all-4.0.46.Final

<!-- Maven POM-->
<!-- https://mvnrepository.com/artifact/io.netty/netty-all -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.46.Final</version>
</dependency>


服务端代码

/**
* 读操作的超时时间,0表示不监控
*/
private Integer readerTimeout = 720;

/**
* 写操作的超时时间,0表示不监控
*/
private Integer writeTimeout = 0;

/**
* 写操作的超时时间,0表示不监控
*/
private Integer allTimeout = 0;

/**
* listen链接
*/
private Channel channel;

private final static String DISCONNECT = "disconnect";
private final static String STOP = "server_stop";
private ServerBootstrap strap;

@Resource
private DeviceHandler handler;

@Value("${netty.server.port}")
private int port;

/**
* 启动netty程序
*
* @throws Exception
*/
public void start() throws Exception {
logger.info("netty server start !");
EventLoopGroup bossGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("bossGroup"));
EventLoopGroup workGroup = new NioEventLoopGroup(0, new DefaultThreadFactory("workGroup"));
logger.info("connection is already!");
this.strap = new ServerBootstrap();
strap.group(bossGroup, workGroup).channel(NioServerSocketChannel.class)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new DelimiterBasedFrameDecoder(1024 * 2, Unpooled.copiedBuffer("\r\n\r\n".getBytes())));
ch.pipeline().addLast("byteArrayDecoder", new ByteArrayDecoder());
ch.pipeline().addLast("byteArrayEncoder", new ByteArrayEncoder());
ch.pipeline().addLast("idleState", new IdleStateHandler(readerTimeout, writeTimeout, allTimeout));
ch.pipeline().addLast(handler);
}
}).option(ChannelOption.SO_BACKLOG, 128).option(ChannelOption.SO_LINGER, 0)
.childOption(ChannelOption.SO_KEEPALIVE, true);
channel = strap.bind(port).sync().channel();
}

/**
* 关闭netty程序
*/
public void stop() {
if (channel != null) {
channel.close();
}
logger.info("disconnected: server stop");
for (DeviceChannel dc : DeviceHandler.getChannels()) {
if (dc != null) {
String deviceId = dc.getDevice() == null ? DeviceHandler.E0E0E0 : dc.getDevice().getDeviceId();
AddressDto addressDto = StringParamUtil.getAddressInfo(dc.getChannel().remoteAddress().toString());
DataCollector.collectConnectInfo(deviceId, addressDto.getIp(), addressDto.getPort(),
dc.getChannel().toString(), NetUtil.hostName(), new Date(), STOP, DISCONNECT);
if (dc.getState() == AbstractChannel.DeviceState.CONNECTED){
dc.disconnect();
}
dc.closeChannel();
}
}
if (strap != null) {
strap.group().shutdownGracefully();
strap.childGroup().shutdownGracefully();
}
DeviceHandler.getChannels().clear();
MDC.remove(ConstantUtil.WTRACEID);
}


<!-- 服务端配置了两个方法,分别用于netty的启动和停止。这两个方法使用spring来调用,如下所示: -->
<bean class="***.***.***" init-method="start" destroy-method="stop"/>


业务代码

我们在服务中用到的DeviceHandler,其实是继承自Netty的

ChannelDuplexHandler,其中核心的两个方法是channelReadwrite。前者用于接收服务器发来的数据,后者用于向服务器发送指令。代码实例如下:

@Component
@ChannelHandler.Sharable
public class DeviceHandler extends ChannelDuplexHandler {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//在这里可以处理硬件发送过来的数据
logger.debug("数据对象长度:" + ((byte[]) msg).length);
}
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
}
}


下面是DeviceHandler 的升级版。当硬件与服务器建立一条链接(channel),我们将活动链接存储到Map中,失效的链接则从map中移除。

下面的代码用到了DeviceChannel ,它其实是继承自AbstractChannel ,我们对于每个硬件的管理便是依赖于它。

在我们的项目中会将硬件发来的数据传递给DeviceChannel,之后使用DeviceChannel将数据转发给其他系统。

@Component
@ChannelHandler.Sharable
public class DeviceHandler extends ChannelDuplexHandler {
private static final Map<SocketAddress, DeviceChannel> channels = new ConcurrentHashMap<>();

@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
DeviceChannel dc = channels.get(ctx.channel().remoteAddress());
if (dc == null) {
dc = new DeviceChannel();
dc.init();
channels.put(ctx.channel().remoteAddress(), dc);
}
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
DeviceChannel dc = channels.remove(ctx.channel().remoteAddress());
if (dc != null && dc.getState() == DeviceState.CONNECTED) {
dc.disconnect();
}
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
//在这里可以处理硬件发送过来的数据
logger.debug("数据对象长度:" + ((byte[]) msg).length);
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
super.write(ctx, msg, promise);
}
}


最后

因为这是成熟的商业项目,所以更详细的代码就不方便透露了。

总的来说,设计思路就是使用DeviceHandler重写ChannelDuplexHandlerchannelRead、write方法,使用它们来处理硬件数据的接收、下发等操作。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息