基于Websocket的实时数据看板
2016-05-27 17:39
246 查看
刚进了一家新公司不久,很喜欢这里的环境,技术氛围浓厚,同事大牛不少,相信我会在这里学到很多东西,尤其是大数据方向的,哈哈哈哈,不废话了。
最近在做的一个项目,是一个大数据分析平台,有如下需求:有如果个实验设备运行并且将运行数据通过socket发送到分析平台,分析平台通过运行socket作业来完成对socket数据的接收,同时还需要对接收到的socket数据进行解析并且可以通过一个数据看板对数据实时监控。如下图所示:
选择websocket的原因,就是看板页面不必每次都去定时的请求数据,而是一旦websocket服务器发现有新数据则直接推送给客户端进行渲染,减少了占用的宽带和服务器资源。由于socket数据量多且快,所以如果将数据直接存入到传统的关系型数据库则会遇到效率瓶颈,这时候就必须要有一个缓存,redis基于内存且最大的特点就是快,用作缓存再合适不过,待作业结束后再将数据保存进hbase中即可。看板用到了baidu的echarts3折线图。
接收到的socket数据源源不断的被写入到redis中,接收到的数据格式类似于:数据量1|数据量2|数据量3|日期1|日期2...websocket的任务就是将这些数据交给看板进行渲染,那么怎么去拿到数据呢?就是websocket服务端定时的去redis拿,一旦有新数据则推送给看板。
首先开发websocket服务端,ServerSocket采用单例模式,因为是集成服务,所以如果需要关闭服务的话可以直接将ServerSocket关闭即可,服务器端代码如下:
PrintWriter pw = getWriter(socket);
byte[] buf = new byte[1024];
int len = in.read(buf, 0, 1024);
byte[] res = new byte[len];
System.arraycopy(buf, 0, res, 0, len);
String key = new String(res);
if (!hasHandshake && key.indexOf("Key") > 0) {
key = key.substring(0, key.indexOf("==") + 2);
key = key.substring(key.indexOf("Key") + 4, key.length())
.trim();
key += "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
MessageDigest md = MessageDigest.getInstance("SHA-1");
md.update(key.getBytes("utf-8"), 0, key.length());
byte[] sha1Hash = md.digest();
sun.misc.BASE64Encoder encoder = new sun.misc.BASE64Encoder();
key = encoder.encode(sha1Hash);
pw.println("HTTP/1.1 101 Switching Protocols");
pw.println("Upgrade: websocket");
pw.println("Connection: Upgrade");
pw.println("Sec-WebSocket-Accept: " + key);
pw.println();
pw.flush();
hasHandshake = true;
}
上面这段代码是websocket握手的代码,握手之后获取看板传来的id,然后再去redis请求数据,发现新数据则推送给看板,推送的代码:
服务器端代码的核心基本上是这些,那么现在是前段的代码了,因为是使用了echarts的折线图,所以至于echarts的使用这里不赘述,在页面加载完毕后连接websocket服务端并且发送id以及其他的一些参数,连接的代码如下所示:
发送数据给服务器端的代码:
websocket的几个方法分别是onopen:连接上后执行;onmessage:接收到数据后执行的逻辑,还有onclose以及onerror都很好理解啦。
最近在做的一个项目,是一个大数据分析平台,有如下需求:有如果个实验设备运行并且将运行数据通过socket发送到分析平台,分析平台通过运行socket作业来完成对socket数据的接收,同时还需要对接收到的socket数据进行解析并且可以通过一个数据看板对数据实时监控。如下图所示:
选择websocket的原因,就是看板页面不必每次都去定时的请求数据,而是一旦websocket服务器发现有新数据则直接推送给客户端进行渲染,减少了占用的宽带和服务器资源。由于socket数据量多且快,所以如果将数据直接存入到传统的关系型数据库则会遇到效率瓶颈,这时候就必须要有一个缓存,redis基于内存且最大的特点就是快,用作缓存再合适不过,待作业结束后再将数据保存进hbase中即可。看板用到了baidu的echarts3折线图。
接收到的socket数据源源不断的被写入到redis中,接收到的数据格式类似于:数据量1|数据量2|数据量3|日期1|日期2...websocket的任务就是将这些数据交给看板进行渲染,那么怎么去拿到数据呢?就是websocket服务端定时的去redis拿,一旦有新数据则推送给看板。
首先开发websocket服务端,ServerSocket采用单例模式,因为是集成服务,所以如果需要关闭服务的话可以直接将ServerSocket关闭即可,服务器端代码如下:
PrintWriter pw = getWriter(socket);
byte[] buf = new byte[1024];
int len = in.read(buf, 0, 1024);
byte[] res = new byte[len];
System.arraycopy(buf, 0, res, 0, len);
String key = new String(res);
if (!hasHandshake && key.indexOf("Key") > 0) {
key = key.substring(0, key.indexOf("==") + 2);
key = key.substring(key.indexOf("Key") + 4, key.length())
.trim();
key += "258EAFA5-E914-47DA-95CA-C5AB0DC85B11";
MessageDigest md = MessageDigest.getInstance("SHA-1");
md.update(key.getBytes("utf-8"), 0, key.length());
byte[] sha1Hash = md.digest();
sun.misc.BASE64Encoder encoder = new sun.misc.BASE64Encoder();
key = encoder.encode(sha1Hash);
pw.println("HTTP/1.1 101 Switching Protocols");
pw.println("Upgrade: websocket");
pw.println("Connection: Upgrade");
pw.println("Sec-WebSocket-Accept: " + key);
pw.println();
pw.flush();
hasHandshake = true;
}
上面这段代码是websocket握手的代码,握手之后获取看板传来的id,然后再去redis请求数据,发现新数据则推送给看板,推送的代码:
/** * 推送数据到客户端 * * @param byteBuf * @param finalFragment * @throws IOException */ private void responseClient(ByteBuffer byteBuf, boolean finalFragment) throws IOException { OutputStream out = socket.getOutputStream(); int first = 0x00; // 是否是输出最后的WebSocket响应片段 if (finalFragment) { first = first + 0x80; first = first + 0x1; } out.write(first); if (byteBuf.limit() < 126) { out.write(byteBuf.limit()); } else if (byteBuf.limit() < 65536) { out.write(126); out.write(byteBuf.limit() >>> 8); out.write(byteBuf.limit() & 0xFF); } else { out.write(127); out.write(0); out.write(0); out.write(0); out.write(0); out.write(byteBuf.limit() >>> 24); out.write(byteBuf.limit() >>> 16); out.write(byteBuf.limit() >>> 8); out.write(byteBuf.limit() & 0xFF); } // Write the content out.write(byteBuf.array(), 0, byteBuf.limit()); out.flush(); }
服务器端代码的核心基本上是这些,那么现在是前段的代码了,因为是使用了echarts的折线图,所以至于echarts的使用这里不赘述,在页面加载完毕后连接websocket服务端并且发送id以及其他的一些参数,连接的代码如下所示:
function connect(form_data) { try { var host = "ws://localhost:8000"; websocket = new WebSocket(host); websocket.onopen = function() { if (form_data != null) sendToServer(form_data);// 与服务器端连接成功后需要将作业实例ID发送给服务器端 }; websocket.onmessage = function(event) {// 服务器端推送过来的数据格式必须是:数据量|日期 // ,其中日期格式为:yyyy-MM-dd HH:mm:ss:SS processMultiData(event.data);// 处理多行数据 if (option2.series.length == 0) genLegend(); genSeries(); myChart.setOption(option2); }; websocket.onclose = function() { // alert('连接关闭'); }; } catch (exception) { alert("error"); } }
发送数据给服务器端的代码:
function sendToServer(option_data) {// 发送实例ID往ws服务器端 try { websocket.send(option_data); } catch (exception) { alert("发送实例ID出错!" + exception); } }
websocket的几个方法分别是onopen:连接上后执行;onmessage:接收到数据后执行的逻辑,还有onclose以及onerror都很好理解啦。
相关文章推荐
- java-模拟tomcat服务器
- Linux socket 初步
- 2015-2016网页设计趋势分析 Web Design of Trends
- redis安装问题小结
- java socket 注意的地方
- java socket 注意的地方
- 使用 Redis 和 Python 构建一个共享单车的应用程序
- Redis偶发连接失败案例实战记录
- Redis中实现查找某个值的范围
- win 7 安装redis服务【笔记】
- redis的hGetAll函数的性能问题(记Redis那坑人的HGETALL)
- Redis和Memcached的区别详解
- 分割超大Redis数据库例子
- Redis总结笔记(一):安装和常用命令
- Redis sort 排序命令详解
- 用Redis实现微博关注关系
- Redis实现信息已读未读状态提示
- C#基于socket模拟http请求的方法
- redis中修改配置文件中的端口号 密码方法
- 在Ruby on Rails上使用Redis Store的方法