您的位置:首页 > 理论基础 > 计算机网络

基于Netty写一个http协议的服务

2018-02-08 10:31 393 查看
第一步:引入netty的jar包,本文是使用的是netty-all-4.1.9.Final.jar
第二步:创建netty服务
import javax.annotation.PostConstruct;
import javax.annotation.PreDestroy;

import org.slf4j.Logger;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.nio.NioServerSocketChannel;

@Component
@Lazy(false)
public class NettySocketServer {
private static final Logger log = Log4jManager.get();

@Value("${SOCKET.PORT}")
private Integer socket_port;
@Value("${SOCKET.URL}")
private String socket_url;
private Channel ch = null;
EventLoopGroup bossGroup = null;
EventLoopGroup workerGroup = null;

@Autowired
private SocketServerFilter socketServerFilter;

@PostConstruct
public void init() {
bossGroup = new NioEventLoopGroup();
workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup).channel(NioServerSocketChannel.class).localAddress(socket_url, socket_port).childHandler(socketServerFilter);
ChannelFuture f = b.bind().sync();
ch = f.channel();
log.info(NettySocketServer.class.getName() + "started and listen on " + f.channel().localAddress());
} catch (InterruptedException e) {
log.error("启动websocket服务失败!!");
e.printStackTrace();
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
log.info("启动websocket服务成功!!");
}

@PreDestroy
public void destory() {
log.error("销毁websocket服务!!");
if (ch != null) {
ch.close();
}
if (bossGroup != null) {
bossGroup.shutdownGracefully();
}
if (workerGroup != null) {
workerGroup.shutdownGracefully();
}
}
}
请求过滤并添加执行handler:
@Component
@Lazy(false)
public class SocketServerFilter extends ChannelInitializer<SocketChannel>{

@Value("${SOCKET.PATH}")
private String socket_path;
@Autowired
private MarketDataService marketDataService;

@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast("decoder", new HttpRequestDecoder());
ch.pipeline().addLast("encoder", new HttpResponseEncoder());
ch.pipeline().addLast("aggregator", new HttpObjectAggregator(65535));
//ch.pipeline().addLast("http-chunked", new ChunkedWriteHandler());
ch.pipeline().addLast("request-handler", new NettyServerHandler(socket_path, marketDataService));
}

}注入参数:



对应handler处理请求:
public class NettyServerHandler extends ChannelInboundHandlerAdapter {
private static final Logger log = Log4jManager.get();

private String result = "";
private String socket_path;
private MarketDataService marketDataService;

public NettyServerHandler(String socket_path, MarketDataService marketDataService) {
this.socket_path = socket_path;
this.marketDataService = marketDataService;
}

/*
* 收到消息时,返回信息
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (!(msg instanceof FullHttpRequest)) {
result = "未知请求!";
send(ctx, result, HttpResponseStatus.BAD_REQUEST);
return;
}
FullHttpRequest httpRequest = (FullHttpRequest) msg;
try {
String path = httpRequest.uri(); // 获取路径
System.out.println("请求path:" + path);
String body = getBody(httpRequest); // 获取参数
System.out.println("请求参数:" + body);
HttpMethod method = httpRequest.method();// 获取请求方法
// 如果不是这个路径,就直接返回错误
if (!socket_path.equalsIgnoreCase(path) && !HttpMethod.GET.equals(method)) {
result = "非法请求!";
send(ctx, result, HttpResponseStatus.BAD_REQUEST);
return;
}
if (path.equals("/favicon.ico")) {
return;
}
if (!path.startsWith(socket_path)) {
result = "请求参数有误";
send(ctx, result, HttpResponseStatus.BAD_REQUEST);
return;
}
Map<String, String> parseRequest = parseRequest(httpRequest);
log.info("请求参数parseRequest:{}", parseRequest);
// 接受到的消息,做业务逻辑处理...
result = marketDataService.getMarketData(parseRequest);
System.out.println(result);
send(ctx, result, HttpResponseStatus.OK);
return;
} catch (Exception e) {
log.info("处理请求失败,e:{}", e);
} finally {
// 释放请求
httpRequest.release();
}
}

/**
* 获取body参数
*
* @param request
* @return
*/
private String getBody(FullHttpRequest request) {
ByteBuf buf = request.content();
return buf.toString(CharsetUtil.UTF_8);
}

/**
* 发送的返回值
*
* @param ctx
* 返回
* @param context
* 消息
* @param status
* 状态
*/
private void send(ChannelHandlerContext ctx, String context, HttpResponseStatus status) {
FullHttpResponse response = new DefaultFullHttpResponse(HttpVersion.HT
a566
TP_1_1, status, Unpooled.copiedBuffer(context, CharsetUtil.UTF_8));
response.headers().set(HttpHeaderNames.CONTENT_TYPE, "text/plain; charset=UTF-8");
ctx.writeAndFlush(response).addListener(ChannelFutureListener.CLOSE);
}

/*
* 建立连接时,返回消息
*/
@Override
public void channelActive(ChannelHandlerContext ctx) throws Exception {
System.out.println("连接的客户端地址:" + ctx.channel().remoteAddress());
ctx.writeAndFlush("客户端" + InetAddress.getLocalHost().getHostName() + "成功与服务端建立连接! ");
super.channelActive(ctx);
}

/**
* 解析请求参数
*
* @return 包含所有请求参数的键值对, 如果没有参数, 则返回空Map
*
* @throws IOException
* @throws MethodNotSupportedException
*/
public Map<String, String> parseRequest(FullHttpRequest request) throws IOException, MethodNotSupportedException {
HttpMethod method = request.method();
Map<String, String> parmMap = new HashMap<>();
if (HttpMethod.GET == method) {
// 是GET请求
QueryStringDecoder decoder = new QueryStringDecoder(request.uri());
decoder.parameters().entrySet().forEach(entry -> {
// entry.getValue()是一个List, 只取第一个元素
parmMap.put(entry.getKey(), entry.getValue().get(0));
});
} else if (HttpMethod.POST == method) {
// 是POST请求
HttpPostRequestDecoder decoder = new HttpPostRequestDecoder(request);
decoder.offer(request);

List<InterfaceHttpData> parmList = decoder.getBodyHttpDatas();

for (InterfaceHttpData parm : parmList) {

Attribute data = (Attribute) parm;
parmMap.put(data.getName(), data.getValue());
}

} else {
// 不支持其它方法
throw new MethodNotSupportedException(""); //
}
return parmMap;
}
}
最后,根据自己的业务需要处理业务
比如:
@Component
public class MarketDataService {

private static final Logger log = Log4jManager.get();

/**
* 获取行情数据
* @param parseRequest 请求参数集
*/
public String getMarketData(Map<String, String> parseRequest) {
String futuresCode = parseRequest.get("futuresCode");//
String type = parseRequest.get("type");//
if(Tools.checkNull(futuresCode) || Tools.checkNull(type)){
return Tools.toJson(new Resp(ResCode.INVALID_PARAMTER));
}
// TODO 条件查询 内存中存放的行情信息并且返回
return Tools.toJson(new Resp(ResCode.SUCCESS));
}
}效果:http://localhost:9090/marketinfo?futuresCode=1&type=1min

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: