spring boot 集成原生netty(非netty-io-socket)
2018-03-26 20:37
661 查看
spring boot 集成netty先上配置 Yml代码
tcp:
port: 8090
boss:
thread:
count: 2
worker:
thread:
count: 2
so:
keepalive: true
backlog: 100
很简单的配置,端口开放为8090 再上配置类: Java代码
package com.fengbaogu.config;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import com.fengbaogu.handlers.StringProtocolInitalizer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
@Configuration
public class Config {
@Value("${boss.thread.count}")
private int bossCount;
@Value("${worker.thread.count}")
private int workerCount;
@Value("${tcp.port}")
private int tcpPort;
@Value("${so.keepalive}")
private boolean keepAlive;
@Value("${so.backlog}")
private int backlog;
@Autowired
@Qualifier("springProtocolInitializer")
private StringProtocolInitalizer protocolInitalizer;
@SuppressWarnings("unchecked")
@Bean(name = "serverBootstrap")
public ServerBootstrap bootstrap() {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.childHandler(protocolInitalizer);
Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes")
ChannelOption option : keySet) {
b.option(option, tcpChannelOptions.get(option));
}
return b;
}
@Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(bossCount);
}
@Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerCount);
}
@Bean(name = "tcpSocketAddress")
public InetSocketAddress tcpPort() {
return new InetSocketAddress(tcpPort);
}
@Bean(name = "tcpChannelOptions")
public Map<ChannelOption<?>, Object> tcpChannelOptions() {
Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
options.put(ChannelOption.SO_KEEPALIVE, keepAlive);
options.put(ChannelOption.SO_BACKLOG, backlog);
return options;
}
@Bean(name = "stringEncoder")
public StringEncoder stringEncoder() {
return new StringEncoder();
}
@Bean(name = "stringDecoder")
public StringDecoder stringDecoder() {
return new StringDecoder();
}
/**
* Necessary to make the Value annotations work.
*
* @return
*/
@Bean
public static PropertySourcesPlaceholderConfigurer propertyPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
}
其中:Java代码
options.put(ChannelOption.SO_KEEPALIVE, keepAlive);
为保持连接,在服务正常的情况下,不主动关闭连接,连接是不会断的。 基本配置已经OK,现在配置字符串的协议:Java代码
package com.fengbaogu.handlers;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
* Just a dummy protocol mainly to show the ServerBootstrap being initialized.
*
* @author Abraham Menacherry
*
*/
@Component
@Qualifier("springProtocolInitializer")
public class StringProtocolInitalizer extends ChannelInitializer<SocketChannel> {
@Autowired
StringDecoder stringDecoder;
@Autowired
StringEncoder stringEncoder;
@Autowired
ServerHandler serverHandler;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", stringDecoder);
pipeline.addLast("handler", serverHandler);
pipeline.addLast("encoder", stringEncoder);
}
public StringDecoder getStringDecoder() {
return stringDecoder;
}
public void setStringDecoder(StringDecoder stringDecoder) {
this.stringDecoder = stringDecoder;
}
public StringEncoder getStringEncoder() {
return stringEncoder;
}
public void setStringEncoder(StringEncoder stringEncoder) {
this.stringEncoder = stringEncoder;
}
public ServerHandler getServerHandler() {
return serverHandler;
}
public void setServerHandler(ServerHandler serverHandler) {
this.serverHandler = serverHandler;
}
}
此配置,可直接解码字符串数据,及自己的处理器逻辑 现在我们来上自己的处理器逻辑: Java代码
package com.fengbaogu.handlers;
import java.net.InetAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@Component
@Qualifier("serverHandler")
@Sharable
public class ServerHandler extends SimpleChannelInboundHandler<String> {
private static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg)
throws Exception {
log.info("client msg:"+msg);
String clientIdToLong= ctx.channel().id().asLongText();
log.info("client long id:"+clientIdToLong);
String clientIdToShort= ctx.channel().id().asShortText();
log.info("client short id:"+clientIdToShort);
if(msg.indexOf("bye")!=-1){
//close
ctx.channel().close();
}else{
//send to client
ctx.channel().writeAndFlush("Yoru msg is:"+msg);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
ctx.channel().writeAndFlush( "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("\nChannel is disconnected");
super.channelInactive(ctx);
}
}
逻辑和简单的业务已经实现。现在我们来配置netty的启动类。 Java代码
package com.fengbaogu.config;
import java.net.InetSocketAddress;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
@Component
public class TCPServer {
@Autowired
@Qualifier("serverBootstrap")
private ServerBootstrap b;
@Autowired
@Qualifier("tcpSocketAddress")
private InetSocketAddress tcpPort;
private ChannelFuture serverChannelFuture;
@PostConstruct
public void start() throws Exception {
System.out.println("Starting server at " + tcpPort);
serverChannelFuture = b.bind(tcpPort).sync();
}
@PreDestroy
public void stop() throws Exception {
serverChannelFuture.channel().closeFuture().sync();
}
public ServerBootstrap getB() {
return b;
}
public void setB(ServerBootstrap b) {
this.b = b;
}
public InetSocketAddress getTcpPort() {
return tcpPort;
}
public void setTcpPort(InetSocketAddress tcpPort) {
this.tcpPort = tcpPort;
}
}
利用@PostConstruct和@PreDestroy来开启和关闭netty启动boot,利用telnet连接,我们来看效果Java代码
telnet 192.168.1.44 8090
显示:Java代码
Connecting to 192.168.1.44:8090...
Connection established.
To escape to local shell, press 'Ctrl+Alt+]'.
Welcome to DESKTOP-QOTQUO6 service!
说明已经连接上了 各项测试通过,spring boot集成netty
tcp:
port: 8090
boss:
thread:
count: 2
worker:
thread:
count: 2
so:
keepalive: true
backlog: 100
很简单的配置,端口开放为8090 再上配置类: Java代码
package com.fengbaogu.config;
import java.net.InetSocketAddress;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.support.PropertySourcesPlaceholderConfigurer;
import com.fengbaogu.handlers.StringProtocolInitalizer;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
@Configuration
public class Config {
@Value("${boss.thread.count}")
private int bossCount;
@Value("${worker.thread.count}")
private int workerCount;
@Value("${tcp.port}")
private int tcpPort;
@Value("${so.keepalive}")
private boolean keepAlive;
@Value("${so.backlog}")
private int backlog;
@Autowired
@Qualifier("springProtocolInitializer")
private StringProtocolInitalizer protocolInitalizer;
@SuppressWarnings("unchecked")
@Bean(name = "serverBootstrap")
public ServerBootstrap bootstrap() {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup(), workerGroup())
.channel(NioServerSocketChannel.class)
.childHandler(protocolInitalizer);
Map<ChannelOption<?>, Object> tcpChannelOptions = tcpChannelOptions();
Set<ChannelOption<?>> keySet = tcpChannelOptions.keySet();
for (@SuppressWarnings("rawtypes")
ChannelOption option : keySet) {
b.option(option, tcpChannelOptions.get(option));
}
return b;
}
@Bean(name = "bossGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup bossGroup() {
return new NioEventLoopGroup(bossCount);
}
@Bean(name = "workerGroup", destroyMethod = "shutdownGracefully")
public NioEventLoopGroup workerGroup() {
return new NioEventLoopGroup(workerCount);
}
@Bean(name = "tcpSocketAddress")
public InetSocketAddress tcpPort() {
return new InetSocketAddress(tcpPort);
}
@Bean(name = "tcpChannelOptions")
public Map<ChannelOption<?>, Object> tcpChannelOptions() {
Map<ChannelOption<?>, Object> options = new HashMap<ChannelOption<?>, Object>();
options.put(ChannelOption.SO_KEEPALIVE, keepAlive);
options.put(ChannelOption.SO_BACKLOG, backlog);
return options;
}
@Bean(name = "stringEncoder")
public StringEncoder stringEncoder() {
return new StringEncoder();
}
@Bean(name = "stringDecoder")
public StringDecoder stringDecoder() {
return new StringDecoder();
}
/**
* Necessary to make the Value annotations work.
*
* @return
*/
@Bean
public static PropertySourcesPlaceholderConfigurer propertyPlaceholderConfigurer() {
return new PropertySourcesPlaceholderConfigurer();
}
}
其中:Java代码
options.put(ChannelOption.SO_KEEPALIVE, keepAlive);
为保持连接,在服务正常的情况下,不主动关闭连接,连接是不会断的。 基本配置已经OK,现在配置字符串的协议:Java代码
package com.fengbaogu.handlers;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelPipeline;
import io.netty.channel.socket.SocketChannel;
import io.netty.handler.codec.string.StringDecoder;
import io.netty.handler.codec.string.StringEncoder;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
/**
* Just a dummy protocol mainly to show the ServerBootstrap being initialized.
*
* @author Abraham Menacherry
*
*/
@Component
@Qualifier("springProtocolInitializer")
public class StringProtocolInitalizer extends ChannelInitializer<SocketChannel> {
@Autowired
StringDecoder stringDecoder;
@Autowired
StringEncoder stringEncoder;
@Autowired
ServerHandler serverHandler;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast("decoder", stringDecoder);
pipeline.addLast("handler", serverHandler);
pipeline.addLast("encoder", stringEncoder);
}
public StringDecoder getStringDecoder() {
return stringDecoder;
}
public void setStringDecoder(StringDecoder stringDecoder) {
this.stringDecoder = stringDecoder;
}
public StringEncoder getStringEncoder() {
return stringEncoder;
}
public void setStringEncoder(StringEncoder stringEncoder) {
this.stringEncoder = stringEncoder;
}
public ServerHandler getServerHandler() {
return serverHandler;
}
public void setServerHandler(ServerHandler serverHandler) {
this.serverHandler = serverHandler;
}
}
此配置,可直接解码字符串数据,及自己的处理器逻辑 现在我们来上自己的处理器逻辑: Java代码
package com.fengbaogu.handlers;
import java.net.InetAddress;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import io.netty.channel.ChannelHandler.Sharable;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
@Component
@Qualifier("serverHandler")
@Sharable
public class ServerHandler extends SimpleChannelInboundHandler<String> {
private static final Logger log = LoggerFactory.getLogger(ServerHandler.class);
@Override
public void channelRead0(ChannelHandlerContext ctx, String msg)
throws Exception {
log.info("client msg:"+msg);
String clientIdToLong= ctx.channel().id().asLongText();
log.info("client long id:"+clientIdToLong);
String clientIdToShort= ctx.channel().id().asShortText();
log.info("client short id:"+clientIdToShort);
if(msg.indexOf("bye")!=-1){
//close
ctx.channel().close();
}else{
//send to client
ctx.channel().writeAndFlush("Yoru msg is:"+msg);
}
}
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
log.info("RamoteAddress : " + ctx.channel().remoteAddress() + " active !");
ctx.channel().writeAndFlush( "Welcome to " + InetAddress.getLocalHost().getHostName() + " service!\n");
super.channelActive(ctx);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
@Override
public void channelInactive(ChannelHandlerContext ctx) throws Exception {
log.info("\nChannel is disconnected");
super.channelInactive(ctx);
}
}
逻辑和简单的业务已经实现。现在我们来配置netty的启动类。 Java代码
package com.fengbaogu.config;
import java.net.InetSocketAddress;
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
@Component
public class TCPServer {
@Autowired
@Qualifier("serverBootstrap")
private ServerBootstrap b;
@Autowired
@Qualifier("tcpSocketAddress")
private InetSocketAddress tcpPort;
private ChannelFuture serverChannelFuture;
@PostConstruct
public void start() throws Exception {
System.out.println("Starting server at " + tcpPort);
serverChannelFuture = b.bind(tcpPort).sync();
}
@PreDestroy
public void stop() throws Exception {
serverChannelFuture.channel().closeFuture().sync();
}
public ServerBootstrap getB() {
return b;
}
public void setB(ServerBootstrap b) {
this.b = b;
}
public InetSocketAddress getTcpPort() {
return tcpPort;
}
public void setTcpPort(InetSocketAddress tcpPort) {
this.tcpPort = tcpPort;
}
}
利用@PostConstruct和@PreDestroy来开启和关闭netty启动boot,利用telnet连接,我们来看效果Java代码
telnet 192.168.1.44 8090
显示:Java代码
Connecting to 192.168.1.44:8090...
Connection established.
To escape to local shell, press 'Ctrl+Alt+]'.
Welcome to DESKTOP-QOTQUO6 service!
说明已经连接上了 各项测试通过,spring boot集成netty
相关文章推荐
- Spring Boot实战之netty-socketio实现简单聊天室(给指定用户推送消息)
- Spring Boot实战之netty-socketio实现简单聊天室(给指定用户推送消息)
- Spring Boot实战之netty-socketio实现简单聊天室(给指定用户推送消息)
- Spring Boot使用Netty SocketIO实现WebIM功能
- Spring-boot集成Netty做websocket服务端
- springmvc+maven+netty-socketio服务端构建实时通信
- spring boot+mvc+mybatis+netty-sokey.io+html+js实现简单即时通讯聊天系统
- spring boot 2.0,netty,mybatis,mysql,redis,docker 集成
- spring boot 下集成netty socket.io
- springboot - 集成jdk 原生webservice
- springmvc+maven+netty-socketio服务端构建实时通信
- windows环境下springboot集成phoenix时报如下异常: java.io.IOException: Could not locate executable null\bin\winut
- spring boot 集成kafka (多线程,消费者使用kafka的原生api实现,因为@KakfkaListener修改groupId无效)
- 百度富文本编辑器Ueditor 集成springboot(不修改源码)
- spring boot与kafka集成的简单实例
- spring-boot集成mybatis
- (16)spring boot中集成Redis实例
- spring-boot集成EasyUI和KindEditor
- springboot集成swagger2,构建强大的RESTful API文档
- Spring boot 集成工作流flowable去掉xml配置