您的位置:首页 > 其它

WebSocket在各种框架下的应用

2017-12-17 20:30 155 查看

1. 概述

WebSocket协议是基于TCP的一种新的网络协议。

它实现了浏览器与服务器全双工(full-duplex)通信——允许服务器主动发送信息给客户端。

最初的http协议不支持服务器端向浏览器主动推送消息,需要各个浏览器安装插件才能支持。

后来随着时代的发展,越来越多的网页端需要更好、更灵活,轻量的与服务器通讯,因此新版本的标准开始支持WebSocket协议。

2. 特点

相比HTTP长连接,WebSocket有以下特点:

* 是真正的全双工方式,建立连接后客户端与服务器端是完全平等的,可以互相主动请求。而HTTP长连接基于HTTP,是传统的客户端对服务器发起请求的模式。

* HTTP长连接中,每次数据交换除了真正的数据部分外,服务器和客户端还要大量交换HTTP header,信息交换效率很低。Websocket协议通过第一个request建立了TCP连接之后,之后交换的数据都不需要发送 HTTP header就能交换数据,这显然和原有的HTTP协议有区别所以它需要对服务器和客户端都进行升级才能实现(主流浏览器都已支持HTML5)。此外还有 multiplexing、不同的URL可以复用同一个WebSocket连接等功能。这些都是HTTP长连接不能做到的。

3. 应用

3.1 在play框架

官网

3.1.1 在route上配置

GET     /dm/ws/subscribe                         elemental.message.controllers.HomeController.ws


3.1.2 使用akka stream和actor和websocket握手

def ws: WebSocket = WebSocket.acceptOrResult[JsValue, JsValue]


3.1.3 创建websocket连接

def createWebSocketConnections(): (ActorRef, Publisher[JsValue]) = {

//创建 source
val source: Source[JsValue, ActorRef] = {

val logging = Logging(actorSystem.eventStream, logger.getName)

// 设置参数和超出策略
Source.actorRef[JsValue](10, OverflowStrategy.dropTail).log("actorRefSource")(logging)
}

// 创建 Publisher Sink
val sink: Sink[JsValue, Publisher[JsValue]] = Sink.asPublisher(fanout = false)

// source sink关联 Connect the source and sink into a flow, telling it to keep the materialized values,
source.toMat(sink)(Keep.both).run()
}


3.1.4 建立处理流程

// 根据requestId sid创建actor
val userActorFuture = createUserActor(request.id.toString, sid, webSocketOut)

// Once we have an actor available, create a flow...
userActorFuture.map { userActor =>
createWebSocketFlow(webSocketIn, userActor)
}


3.2 Tyrus实现

3.2.1 类配置

@ServerEndpoint(value = "/subscribe", decoders = {
SubscribeMsgDecoder.class
}, encoders = {
SubscribeMsgEncoder.class
}, configurator = WebSocketServerConfig.class)
public class WebSocketServerEndpoint {}


3.2.2 Actor 初始化

ActorSystem system = ActorSystem.create("WebSocketServer");
actor = system.actorOf(Props.create(PushMsgActor.class, ssnMap).withRouter(new SmallestMailboxPool(config.getActorNum())));
mqSub = new MQSubscribePoint(configure, config.getExchangeName(), config.getRoutingKey(), config.getQueueName(), actor);


3.2.3 服务启动

Server server = new Server (host [0], config.getPort (), path, null, WebSocketServerEndpoint.class);


3.3 Netty Thread实现

public void run() {

//服务端辅助启动类
ServerBootstrap bootstrap = new ServerBootstrap();
//服务端接受客户端的连接线程
EventLoopGroup bossGroup = new NioEventLoopGroup();
//网络读写线程
EventLoopGroup workerGroup = new NioEventLoopGroup();

try{
bootstrap.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.option(ChannelOption.SO_BACKLOG, 1024)
.childOption(ChannelOption.SO_KEEPALIVE, true)
.childHandler(new ChannelInitializer<SocketChannel>(){

protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new ResolveMessageHandler());//adapte1
ch.pipeline().addLast(event,new KafKaWriteHandler());//adapter2
}

});

ChannelFuture cfuture = bootstrap.bind(Integer.parseInt((String)pros.get("server-port"))).sync();
log.info("NettyServer 启动... ");
//等待服务端退出
cfuture.channel().closeFuture().sync();

}catch (Exception e) {
log.error("NettyServer 启动异常:" + e.getMessage(), e);
}finally{
//释放线程资源
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
}
}


4. 总结

后台创建websocket server过程

1. 提供对外接口

2. 对内创建actor或者adapte服务

3. 内部接受各种信息并push不同数据
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  websocket