TCP通信转HTTP桥接器(转发zabbix数据为例)
2016-05-30 20:07
471 查看
需求场景
由于项目需要我们有大量服务器部署在互联网上,需要zabbix_agent监控服务器数据,但是用于接收监控数据的zabbix_server对访问安全要求较为严格。zabbix_server部署在专网内,仅接收来自CDN的HTTP请求,同时对Host访问地址有大量限制。zabbix默认的通信协议为基于TCP协议的自定义通信协议详见:zabbix sender协议的研究,因此必然无法通过CDN,防火墙最终到达zabbix_server。因此我们需要将zabbix_agent与server之间的通信通过HTTP桥接起来。
将原TCP通信的C/S结构软件的数据经由HTTP转发。即:
SERVER <-TCP-> CLIENT
转为:
SERVER <-TCP-> BRIDGE <- HTTP -> BRIDGE <- TCP -> CLIENT
由此实现了C/S间穿过CDN,防火墙等复杂网络设备下的通信。
需要注意的是,由于客户端服务器无固定IP,这里的zabbix_agent使用active模式主动往server汇报数据,并接受server的响应。
当然,本文程序由于不涉及处理TCP数据,因此只要是基于TCP的C/S结构软件,均可以使用本文的方法实现HTTP桥接。
流程与程序设计
数据传输流程
分别在客户端与服务端开启桥接器服务,在客户端桥接器启用TCP监听,在服务端桥接器启用HTTP监听。1、zabbix_agent 启动,通过TCP发起check active查询,被客户端桥接器监听到。
2、客户端桥接器将TCP数据(byte[])经过Base64编码,通过HTTP方式发送到服务端桥接器。(此时TCP与HTTP连接均未断开)
3、服务端桥接器接收到HTTP请求,解析Base64编码的数据后将其恢复成原始byte[],通过TCP方式发送到zabbix_server,并接受响应。将响应数据经过Base64编码后通过HTTP返回给客户端。
4、客户端获得HTTP 200响应后,从中解析响应字符串,通过Base64恢复为原zabbix_server响应的byte[],并通过TCP发送给zabbix_agent,断开socket。(zabbix_agent 规定一次TCP通信只允许发送一条数据,所以此处断开SOCKET是正确的)
一次桥接通信完成。
程序设计
支持两种启动模式
桥接器功能简单,无需分别实现TCP,HTTP两套程序。只要封装在一个程序里,支持两种启动模式即可。// 启动服务端HTTP服务 java -jar tcp2httpbridge.jar HTTP // 启动客户端TCP服务 java -jar tcp2httpbridge.jar TCP
主程序入口如下
public static void main(String[] args) { if(args.length<1){ // 没有指定启动方式 System.out.println("USAGE : java -jar tcp2httpbridge.jar [HTTP/TCP]"); } else if(args[0].toUpperCase().equals(StaticValue.APPTYPE.HTTP)){ // 以HTTP方式启动 HTTPServer.start(); } else if(args[0].toUpperCase().equals(StaticValue.APPTYPE.TCP)){ // 以TCP方式启动 TCPServer.startServer( Integer.parseInt(ConfigLoader.getInstance().getValue("local.tcp.port")), TCPServer.class); } else { //参数不合法 System.out.println("USAGE : java -jar tcp2httpbridge.jar [HTTP/TCP]"); } }
程序参数支持配置
将程序常用参数写入app.properties文件中,实现程序的灵活配置。本示例提取的参数有:
# 应用HTTP项目名称 app.project=tcp2httpbridge # 最大TCP/HTTP读取缓冲大小 app.maxbuffer=20480 # 最大HTTP监听数量 app.maxhttphandler=100 # TCP模式时,本地TCP监听端口(zabbix_agent配置中将ServerActive端口改为此即可被监听) local.tcp.port=1234 # HTTP模式时,本地HTTP监听端口 local.http.port=8888 # 转发HTTP服务器地址(此处指部署有zabbix_server的服务器地址) remote.http.server=http://xxx.xxx.xxx.xxx # 转发HTTP服务端口 remote.http.port=8888 # 发送TCP目的地址(IP或域名均可) remote.tcp.server=localhost # 发送TCP目的端口(10051对应zabbix_server监听agent数据的端口) remote.tcp.port=10051 # HTTP请求API地址 api.zabbix=zabbix
关键步骤
多线程监听TCP请求
由于一次TCP通信桥接可能较为耗时,若有多个zabbix_agent客户端同时经过此桥接器转接,可能出现多个TCP请求并发的情况,因此不能仅靠一个TCP线程监听,这样会阻塞后续的TCP请求。因此应该在接到TCP请求后,马上将其交给一个线程单独处理,主线程马上恢复监听。
参考来源:java tcp 端口监听,代码如下
public class TCPServer extends Thread{ protected Socket socket; private static final Logger logger = LoggerFactory.getLogger(TCPServer.class); public void run() { try { InputStream is = socket.getInputStream(); byte[] buf = new byte[Integer.parseInt(ConfigLoader.getInstance().getValue("app.maxbuffer"))]; int length = is.read(buf); byte[] data = new byte[length]; for(int i=0;i<length;i++){ data[i] = buf[i]; } logger.info("TCP server 接收到数据: " + new String(data)); socket.shutdownInput(); ResultInfo result = HttpSender.send( ConfigLoader.getInstance().getValue("remote.http.server")+":"+ ConfigLoader.getInstance().getValue("remote.http.port")+"/"+ ConfigLoader.getInstance().getValue("app.project")+"/"+ ConfigLoader.getInstance().getValue("api.zabbix"), data); if(result.getStateId()<0){ logger.error("状态码不为0,"+result.getErrorMsg()); } else { logger.info("HTTP返回:"+result); byte[] en = result.getContent().getBytes(); byte[] de = Base64Util.decryBytes(en); logger.info("TCP 写入:"+new String(de)); OutputStream os = socket.getOutputStream(); os.write(de); os.flush(); socket.shutdownOutput(); } logger.info("完成TCP交互,退出socket"); socket.close(); } catch (Exception e) { logger.error("接收TCP数据出现错误", e); } } public static void startServer(int port, Class obj){ ServerSocket serverSocket; try { serverSocket = new ServerSocket(port); logger.info("TCP server 监听 : " + port); while(true){ Socket e =null; try { // 在这里一直等待链接 e = serverSocket.accept(); // 一旦链接,将socket转入新进程进行处理,主进程重新监听新请求 logger.info("建立新连接..."); TCPServer server = (TCPServer) obj.newInstance(); server.socket = e; server.start(); } catch (InstantiationException e1) { logger.error("建立TCP连接错误",e1); e.close(); } catch (IllegalAccessException e1) { logger.error("建立TCP连接错误",e1); e.close(); } } } catch (IOException e) { logger.error("启动TCP监听错误",e); } } }
多线程监听HTTP请求
通过Jersey和sun httpserver建立一个简单的HTTP服务即可。代码如下:public class HTTPServer { private static final Logger logger = LoggerFactory.getLogger(HTTPServer.class); public static void start() { try { ContextLoader.load(); HttpServerProvider provider = HttpServerProvider.provider(); HttpServer httpserver =provider.createHttpServer( new InetSocketAddress(Integer.parseInt(ConfigLoader.getInstance().getValue("local.http.port"))), Integer.parseInt(ConfigLoader.getInstance().getValue("app.maxhttphandler"))); httpserver.createContext(ContextLoader.contextPath, new CoreHandler()); httpserver.setExecutor(null); httpserver.start(); logger.info("HTTP 服务开启成功 :" + Integer.parseInt(ConfigLoader.getInstance().getValue("local.http.port"))); } catch (NumberFormatException e) { logger.error("HTTP 服务开启失败",e); } catch (IOException e) { logger.error("HTTP 服务开启失败",e); } } }
Socket的处理
socket处理简单的接受inputstrea和输出outputstream就不细说了,这里说明我遇到的问题:1、不管是zabbix_agent或是zabbix_server均没有输出结束标志,因此简单的通过DataoutputStream或者BufferedStreamReader的readLine()方式均会遇到程序不知道TCP数据合适传输结束,连接一直保持等待输入直到超时断开为止。
这种处理是不可行的,此处应该直接处理inputstream,将TCP数据一次性读到byte[]中,有多少读多少,然后直接开始处理。
byte[] responseData = new byte[Integer.parseInt(ConfigLoader.getInstance().getValue("app.maxbuffer"))]; int readCount = 0; while (true) { int read = is.read(responseData, 0, responseData.length - readCount); if (read <= 0) { break; } readCount += read; } byte[] r = new byte[readCount]; for(int i = 0 ; i < readCount; i ++){ r[i] = responseData[i]; } // responseData缓冲预留太长,r是最终的byte[]
2、遇到与server交互,既需要TCP输入也需要TCP输出的交互情景,inputstream和outputstream一定要同时获取到
outputstream要flush后才能获得服务器响应。
Socket socket = new Socket(ip, port); OutputStream os = socket.getOutputStream(); InputStream is = socket.getInputStream(); logger.info("开始输出socket..."); os.write(content); os.flush(); logger.info("开始读入socket..."); byte[] responseData = new byte[Integer.parseInt(ConfigLoader.getInstance().getValue("app.maxbuffer"))]; int readCount = 0; while (true) { int read = is.read(responseData, 0, responseData.length - readCount); if (read <= 0) { break; } readCount += read; } byte[] r = new byte[readCount]; for(int i = 0 ; i < readCount; i ++){ r[i] = responseData[i]; } logger.info("关闭socket..."); os.close(); is.close(); socket.close(); logger.info("读入socket结束,接收返回:"+new String(r)); return r;
HTTP数据封装
如果需要传递byte[],一定要经过Base64编码,同时,如果包装过返回数据结构,并通过fastJson或者其他序列化工具处理的话,目前由于为止原因,fastJson无法处理HashMap<String, byte[]>这种类型,所以本文程序我只通过HTTP传输Base64处理后的String。
效果
客户端
zabbix_agent请求被桥接器监听到->通过HTTP发送->接收到HTTP返回->将返回数据通过TCP发送回zabbix_agent服务端
桥接器接到HTTP请求->转为TCP数据发送给zabbix_server->接收到zabbix_server响应->通过HTTP发回响应zabbix_server监控页面效果
成功在server端注册agent,并上传了一个监控数据。相关文章推荐
- RPC failed; result=22, HTTP code = 411
- Python 实现Zabbix自动发送报表
- Outlook 批量发送邮件
- HTTP Header 属性列表
- nginx中http核心模块的配置指令2
- nginx中http核心模块的配置指令3
- nginx中http核心模块的配置指令4
- nginx中http的fastcgi模块的配置指令1
- Zabbix安装详解
- 如何在 Linux 中快速地通过 HTTP 提供文件访问服务
- 用zabbix监控nginx_status状态
- 深入HTTP head的使用详解
- Ruby程序中发送基于HTTP协议的请求的简单示例
- ASP 中使用 HTTP 协议发送参数详解
- C#基于socket模拟http请求的方法
- http www安全必备知识
- C#端口转发用法详解