基于Netty写一个http协议的服务
2018-02-08 10:31
393 查看
第一步:引入netty的jar包,本文是使用的是netty-all-4.1.9.Final.jar
第二步:创建netty服务
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@Component
@Lazy(false)
public class NettySocketServer {
private static final Logger log = Log4jManager.get();
@Value("${SOCKET.PORT}")
private Integer socket_port;
@Value("${SOCKET.URL}")
private String socket_url;
private Channel ch = null;
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;
@Autowired
private SocketServerFilter socketServerFilter;
@PostConstruct
public void init() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(socket_url, socket_port).childHandler(socketServerFilter);
ChannelFuture f = b.bind().sync();
ch = f.channel();
log.info(NettySocketServer.class.getName() + "started and listen on " + f.channel().localAddress());
} catch (InterruptedException e) {
log.error("启动websocket服务失败!!");
e.printStackTrace();
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
log.info("启动websocket服务成功!!");
}
@PreDestroy
public void destory() {
log.error("销毁websocket服务!!");
if (ch != null) {
ch.close();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
请求过滤并添加执行handler:
@Component
@Lazy(false)
public class SocketServerFilter extends ChannelInitializer<SocketChannel>{
@Value("${SOCKET.PATH}")
private String socket_path;
@Autowired
private MarketDataService marketDataService;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new HttpRequestDecoder());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65535));
//ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
ch.pipeline().addLast("request-handler", new NettyServerHandler(socket_path, marketDataService));
}
}注入参数:
对应handler处理请求:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = Log4jManager.get();
private String result = "";
private String socket_path;
private MarketDataService marketDataService;
public NettyServerHandler(String socket_path, MarketDataService marketDataService) {
this.socket_path = socket_path;
this.marketDataService = marketDataService;
}
/*
* 收到消息时,返回信息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof FullHttpRequest)) {
result = "未知请求!";
send(ctx, result, HttpResponseStatus.BAD_REQUEST);
return;
}
FullHttpRequest httpRequest = (FullHttpRequest) msg;
try {
String path = httpRequest.uri(); // 获取路径
System.out.println("请求path:" + path);
String body = getBody(httpRequest); // 获取参数
System.out.println("请求参数:" + body);
HttpMethod method = httpRequest.method();// 获取请求方法
// 如果不是这个路径,就直接返回错误
if (!socket_path.equalsIgnoreCase(path) && !HttpMethod.GET.equals(method)) {
result = "非法请求!";
send(ctx, result, HttpResponseStatus.BAD_REQUEST);
return;
}
if (path.equals("/favicon.ico")) {
return;
}
if (!path.startsWith(socket_path)) {
result = "请求参数有误";
send(ctx, result, HttpResponseStatus.BAD_REQUEST);
return;
}
Map<String, String> parseRequest = parseRequest(httpRequest);
log.info("请求参数parseRequest:{}", parseRequest);
// 接受到的消息,做业务逻辑处理...
result = marketDataService.getMarketData(parseRequest);
System.out.println(result);
send(ctx, result, HttpResponseStatus.OK);
return;
} catch (Exception e) {
log.info("处理请求失败,e:{}", e);
} finally {
// 释放请求
httpRequest.release();
}
}
/**
* 获取body参数
*
* @param request
* @return
*/
private String getBody(FullHttpRequest request) {
ByteBuf buf = request.content();
return buf.toString(CharsetUtil.UTF_8);
}
/**
* 发送的返回值
*
* @param ctx
* 返回
* @param context
* 消息
* @param status
* 状态
*/
private void send(ChannelHandlerContext ctx, String context, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HT
a566
TP_1_1, status, Unpooled.copiedBuffer(context, CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/*
* 建立连接时,返回消息
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
ctx.writeAndFlush("客户端" + InetAddress.getLocalHost().getHostName() + "成功与服务端建立连接! ");
super.channelActive(ctx);
}
/**
* 解析请求参数
*
* @return 包含所有请求参数的键值对, 如果没有参数, 则返回空Map
*
* @throws IOException
* @throws MethodNotSupportedException
*/
public Map<String, String> parseRequest(FullHttpRequest request) throws IOException, MethodNotSupportedException {
HttpMethod method = request.method();
Map<String, String> parmMap = new HashMap<>();
if (HttpMethod.GET == method) {
// 是GET请求
QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
decoder.parameters().entrySet().forEach(entry -> {
// entry.getValue()是一个List, 只取第一个元素
parmMap.put(entry.getKey(), entry.getValue().get(0));
});
} else if (HttpMethod.POST == method) {
// 是POST请求
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(request);
decoder.offer(request);
List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
for (InterfaceHttpData parm : parmList) {
Attribute data = (Attribute) parm;
parmMap.put(data.getName(), data.getValue());
}
} else {
// 不支持其它方法
throw new MethodNotSupportedException(""); //
}
return parmMap;
}
}
最后,根据自己的业务需要处理业务
比如:
@Component
public class MarketDataService {
private static final Logger log = Log4jManager.get();
/**
* 获取行情数据
* @param parseRequest 请求参数集
*/
public String getMarketData(Map<String, String> parseRequest) {
String futuresCode = parseRequest.get("futuresCode");//
String type = parseRequest.get("type");//
if(Tools.checkNull(futuresCode) || Tools.checkNull(type)){
return Tools.toJson(new Resp(ResCode.INVALID_PARAMTER));
}
// TODO 条件查询 内存中存放的行情信息并且返回
return Tools.toJson(new Resp(ResCode.SUCCESS));
}
}效果:http://localhost:9090/marketinfo?futuresCode=1&type=1min
第二步:创建netty服务
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;
import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@Component
@Lazy(false)
public class NettySocketServer {
private static final Logger log = Log4jManager.get();
@Value("${SOCKET.PORT}")
private Integer socket_port;
@Value("${SOCKET.URL}")
private String socket_url;
private Channel ch = null;
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;
@Autowired
private SocketServerFilter socketServerFilter;
@PostConstruct
public void init() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(socket_url, socket_port).childHandler(socketServerFilter);
ChannelFuture f = b.bind().sync();
ch = f.channel();
log.info(NettySocketServer.class.getName() + "started and listen on " + f.channel().localAddress());
} catch (InterruptedException e) {
log.error("启动websocket服务失败!!");
e.printStackTrace();
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
log.info("启动websocket服务成功!!");
}
@PreDestroy
public void destory() {
log.error("销毁websocket服务!!");
if (ch != null) {
ch.close();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
请求过滤并添加执行handler:
@Component
@Lazy(false)
public class SocketServerFilter extends ChannelInitializer<SocketChannel>{
@Value("${SOCKET.PATH}")
private String socket_path;
@Autowired
private MarketDataService marketDataService;
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new HttpRequestDecoder());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65535));
//ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
ch.pipeline().addLast("request-handler", new NettyServerHandler(socket_path, marketDataService));
}
}注入参数:
对应handler处理请求:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = Log4jManager.get();
private String result = "";
private String socket_path;
private MarketDataService marketDataService;
public NettyServerHandler(String socket_path, MarketDataService marketDataService) {
this.socket_path = socket_path;
this.marketDataService = marketDataService;
}
/*
* 收到消息时,返回信息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof FullHttpRequest)) {
result = "未知请求!";
send(ctx, result, HttpResponseStatus.BAD_REQUEST);
return;
}
FullHttpRequest httpRequest = (FullHttpRequest) msg;
try {
String path = httpRequest.uri(); // 获取路径
System.out.println("请求path:" + path);
String body = getBody(httpRequest); // 获取参数
System.out.println("请求参数:" + body);
HttpMethod method = httpRequest.method();// 获取请求方法
// 如果不是这个路径,就直接返回错误
if (!socket_path.equalsIgnoreCase(path) && !HttpMethod.GET.equals(method)) {
result = "非法请求!";
send(ctx, result, HttpResponseStatus.BAD_REQUEST);
return;
}
if (path.equals("/favicon.ico")) {
return;
}
if (!path.startsWith(socket_path)) {
result = "请求参数有误";
send(ctx, result, HttpResponseStatus.BAD_REQUEST);
return;
}
Map<String, String> parseRequest = parseRequest(httpRequest);
log.info("请求参数parseRequest:{}", parseRequest);
// 接受到的消息,做业务逻辑处理...
result = marketDataService.getMarketData(parseRequest);
System.out.println(result);
send(ctx, result, HttpResponseStatus.OK);
return;
} catch (Exception e) {
log.info("处理请求失败,e:{}", e);
} finally {
// 释放请求
httpRequest.release();
}
}
/**
* 获取body参数
*
* @param request
* @return
*/
private String getBody(FullHttpRequest request) {
ByteBuf buf = request.content();
return buf.toString(CharsetUtil.UTF_8);
}
/**
* 发送的返回值
*
* @param ctx
* 返回
* @param context
* 消息
* @param status
* 状态
*/
private void send(ChannelHandlerContext ctx, String context, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HT
a566
TP_1_1, status, Unpooled.copiedBuffer(context, CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}
/*
* 建立连接时,返回消息
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
ctx.writeAndFlush("客户端" + InetAddress.getLocalHost().getHostName() + "成功与服务端建立连接! ");
super.channelActive(ctx);
}
/**
* 解析请求参数
*
* @return 包含所有请求参数的键值对, 如果没有参数, 则返回空Map
*
* @throws IOException
* @throws MethodNotSupportedException
*/
public Map<String, String> parseRequest(FullHttpRequest request) throws IOException, MethodNotSupportedException {
HttpMethod method = request.method();
Map<String, String> parmMap = new HashMap<>();
if (HttpMethod.GET == method) {
// 是GET请求
QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
decoder.parameters().entrySet().forEach(entry -> {
// entry.getValue()是一个List, 只取第一个元素
parmMap.put(entry.getKey(), entry.getValue().get(0));
});
} else if (HttpMethod.POST == method) {
// 是POST请求
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(request);
decoder.offer(request);
List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();
for (InterfaceHttpData parm : parmList) {
Attribute data = (Attribute) parm;
parmMap.put(data.getName(), data.getValue());
}
} else {
// 不支持其它方法
throw new MethodNotSupportedException(""); //
}
return parmMap;
}
}
最后,根据自己的业务需要处理业务
比如:
@Component
public class MarketDataService {
private static final Logger log = Log4jManager.get();
/**
* 获取行情数据
* @param parseRequest 请求参数集
*/
public String getMarketData(Map<String, String> parseRequest) {
String futuresCode = parseRequest.get("futuresCode");//
String type = parseRequest.get("type");//
if(Tools.checkNull(futuresCode) || Tools.checkNull(type)){
return Tools.toJson(new Resp(ResCode.INVALID_PARAMTER));
}
// TODO 条件查询 内存中存放的行情信息并且返回
return Tools.toJson(new Resp(ResCode.SUCCESS));
}
}效果:http://localhost:9090/marketinfo?futuresCode=1&type=1min
相关文章推荐
- WCF技术剖析之二十七: 如何将一个服务发布成WSDL[基于HTTP-GET的实现](提供模拟程序)
- HTTPSQS(HTTP Simple Queue Service)是一款基于 HTTP GET/POST 协议的轻量级开源简单消息队列服务
- 转:基于HTTP协议的轻量级开源简单队列服务:HTTPSQS
- Netty构建一个简单的http服务
- WCF技术剖析之二十七: 如何将一个服务发布成WSDL[基于HTTP-GET的实现](提供模拟程序)
- RestExpress 一个基于Netty的轻量级Rest服务开发框架
- 基于Netty4构建HTTP服务----浏览器访问和Netty客户端访问
- 基于TCP的协议封装及Netty搭建高可用网络通信服务
- [ZooKeeper.net] 1 模仿dubbo实现一个简要的http服务的注册 基于webapi
- [ZooKeeper.net] 1 模仿dubbo实现一个简要的http服务的注册 基于webapi
- 一个朴素的基于HTTP协议的项目
- 基于HTTP 协议的GET和POST请求服务
- 微服务框架Finagle介绍 Part2: 在Finagle中开发基于Http协议的应用
- android如何利用基于Http 协议的WebService服务来获取远程数据库数据
- HTTPSQS:基于HTTP协议的轻量级开源简单队列服务(安装php客户端)
- Netty构建一个简单的http服务
- 微服务框架Finagle介绍 Part2: 在Finagle中开发基于Http协议的应用
- 一个基于libcurl的多线程HTTP 请求服务模板
- ubuntu上构建简单的基于http的文件共享服务
- 一个基于Socket协议的手机聊天室源码