您的位置:首页 > 理论基础 > 计算机网络

TCP通信转HTTP桥接器(转发zabbix数据为例)

2016-05-30 20:07 471 查看

需求场景

由于项目需要我们有大量服务器部署在互联网上,需要zabbix_agent监控服务器数据,但是用于接收监控数据的zabbix_server对访问安全要求较为严格。zabbix_server部署在专网内,仅接收来自CDN的HTTP请求,同时对Host访问地址有大量限制。

zabbix默认的通信协议为基于TCP协议的自定义通信协议详见:zabbix sender协议的研究,因此必然无法通过CDN,防火墙最终到达zabbix_server。因此我们需要将zabbix_agent与server之间的通信通过HTTP桥接起来。

将原TCP通信的C/S结构软件的数据经由HTTP转发。即:

SERVER <-TCP-> CLIENT

转为:

SERVER <-TCP-> BRIDGE <- HTTP -> BRIDGE <- TCP -> CLIENT

由此实现了C/S间穿过CDN,防火墙等复杂网络设备下的通信。

需要注意的是,由于客户端服务器无固定IP,这里的zabbix_agent使用active模式主动往server汇报数据,并接受server的响应。

当然,本文程序由于不涉及处理TCP数据,因此只要是基于TCP的C/S结构软件,均可以使用本文的方法实现HTTP桥接。

流程与程序设计

数据传输流程

分别在客户端与服务端开启桥接器服务,在客户端桥接器启用TCP监听,在服务端桥接器启用HTTP监听。

1、zabbix_agent 启动,通过TCP发起check active查询,被客户端桥接器监听到。

2、客户端桥接器将TCP数据(byte[])经过Base64编码,通过HTTP方式发送到服务端桥接器。(此时TCP与HTTP连接均未断开)

3、服务端桥接器接收到HTTP请求,解析Base64编码的数据后将其恢复成原始byte[],通过TCP方式发送到zabbix_server,并接受响应。将响应数据经过Base64编码后通过HTTP返回给客户端。

4、客户端获得HTTP 200响应后,从中解析响应字符串,通过Base64恢复为原zabbix_server响应的byte[],并通过TCP发送给zabbix_agent,断开socket。(zabbix_agent 规定一次TCP通信只允许发送一条数据,所以此处断开SOCKET是正确的)

一次桥接通信完成。

程序设计

支持两种启动模式

桥接器功能简单,无需分别实现TCP,HTTP两套程序。只要封装在一个程序里,支持两种启动模式即可。

// 启动服务端HTTP服务
java -jar tcp2httpbridge.jar HTTP
// 启动客户端TCP服务
java -jar tcp2httpbridge.jar TCP


主程序入口如下

public static void main(String[] args) {

if(args.length<1){
// 没有指定启动方式
System.out.println("USAGE : java -jar tcp2httpbridge.jar [HTTP/TCP]");
} else if(args[0].toUpperCase().equals(StaticValue.APPTYPE.HTTP)){
// 以HTTP方式启动
HTTPServer.start();
} else if(args[0].toUpperCase().equals(StaticValue.APPTYPE.TCP)){
// 以TCP方式启动
TCPServer.startServer(
Integer.parseInt(ConfigLoader.getInstance().getValue("local.tcp.port")),
TCPServer.class);
} else {
//参数不合法
System.out.println("USAGE : java -jar tcp2httpbridge.jar [HTTP/TCP]");
}
}


程序参数支持配置

将程序常用参数写入app.properties文件中,实现程序的灵活配置。

本示例提取的参数有:

# 应用HTTP项目名称
app.project=tcp2httpbridge
# 最大TCP/HTTP读取缓冲大小
app.maxbuffer=20480
# 最大HTTP监听数量
app.maxhttphandler=100

# TCP模式时,本地TCP监听端口(zabbix_agent配置中将ServerActive端口改为此即可被监听)
local.tcp.port=1234
# HTTP模式时,本地HTTP监听端口
local.http.port=8888

# 转发HTTP服务器地址(此处指部署有zabbix_server的服务器地址)
remote.http.server=http://xxx.xxx.xxx.xxx
# 转发HTTP服务端口
remote.http.port=8888
# 发送TCP目的地址(IP或域名均可)
remote.tcp.server=localhost
# 发送TCP目的端口(10051对应zabbix_server监听agent数据的端口)
remote.tcp.port=10051

# HTTP请求API地址
api.zabbix=zabbix


关键步骤

多线程监听TCP请求

由于一次TCP通信桥接可能较为耗时,若有多个zabbix_agent客户端同时经过此桥接器转接,可能出现多个TCP请求并发的情况,因此不能仅靠一个TCP线程监听,这样会阻塞后续的TCP请求。

因此应该在接到TCP请求后,马上将其交给一个线程单独处理,主线程马上恢复监听。

参考来源:java tcp 端口监听,代码如下

public class TCPServer extends Thread{

protected Socket socket;
private static final Logger logger = LoggerFactory.getLogger(TCPServer.class);

public void run() {
try {
InputStream is = socket.getInputStream();
byte[] buf = new byte[Integer.parseInt(ConfigLoader.getInstance().getValue("app.maxbuffer"))];
int length = is.read(buf);
byte[] data = new byte[length];
for(int i=0;i<length;i++){
data[i] = buf[i];
}
logger.info("TCP server 接收到数据: " + new String(data));
socket.shutdownInput();
ResultInfo result = HttpSender.send(
ConfigLoader.getInstance().getValue("remote.http.server")+":"+
ConfigLoader.getInstance().getValue("remote.http.port")+"/"+
ConfigLoader.getInstance().getValue("app.project")+"/"+
ConfigLoader.getInstance().getValue("api.zabbix"), data);

if(result.getStateId()<0){
logger.error("状态码不为0,"+result.getErrorMsg());
} else {
logger.info("HTTP返回:"+result);
byte[] en = result.getContent().getBytes();
byte[] de = Base64Util.decryBytes(en);
logger.info("TCP 写入:"+new String(de));
OutputStream os = socket.getOutputStream();
os.write(de);
os.flush();
socket.shutdownOutput();
}
logger.info("完成TCP交互,退出socket");
socket.close();
} catch (Exception e) {
logger.error("接收TCP数据出现错误", e);
}
}

public static void startServer(int port, Class obj){
ServerSocket serverSocket;
try {
serverSocket = new ServerSocket(port);
logger.info("TCP server 监听 : " + port);
while(true){
Socket e =null;
try {
// 在这里一直等待链接
e = serverSocket.accept();
// 一旦链接,将socket转入新进程进行处理,主进程重新监听新请求
logger.info("建立新连接...");
TCPServer server = (TCPServer) obj.newInstance();
server.socket = e;
server.start();
} catch (InstantiationException e1) {
logger.error("建立TCP连接错误",e1);
e.close();
} catch (IllegalAccessException e1) {
logger.error("建立TCP连接错误",e1);
e.close();
}
}
} catch (IOException e) {
logger.error("启动TCP监听错误",e);
}
}

}


多线程监听HTTP请求

通过Jersey和sun httpserver建立一个简单的HTTP服务即可。代码如下:

public class HTTPServer {

private static final Logger logger = LoggerFactory.getLogger(HTTPServer.class);

public static void start() {
try {
ContextLoader.load();
HttpServerProvider provider = HttpServerProvider.provider();
HttpServer httpserver =provider.createHttpServer(
new InetSocketAddress(Integer.parseInt(ConfigLoader.getInstance().getValue("local.http.port"))),
Integer.parseInt(ConfigLoader.getInstance().getValue("app.maxhttphandler")));

httpserver.createContext(ContextLoader.contextPath, new CoreHandler());
httpserver.setExecutor(null);
httpserver.start();
logger.info("HTTP 服务开启成功 :" + Integer.parseInt(ConfigLoader.getInstance().getValue("local.http.port")));
} catch (NumberFormatException e) {
logger.error("HTTP 服务开启失败",e);
} catch (IOException e) {
logger.error("HTTP 服务开启失败",e);
}
}

}


Socket的处理

socket处理简单的接受inputstrea和输出outputstream就不细说了,这里说明我遇到的问题:

1、不管是zabbix_agent或是zabbix_server均没有输出结束标志,因此简单的通过DataoutputStream或者BufferedStreamReader的readLine()方式均会遇到程序不知道TCP数据合适传输结束,连接一直保持等待输入直到超时断开为止。

这种处理是不可行的,此处应该直接处理inputstream,将TCP数据一次性读到byte[]中,有多少读多少,然后直接开始处理。

byte[] responseData =
new byte[Integer.parseInt(ConfigLoader.getInstance().getValue("app.maxbuffer"))];
int readCount = 0;
while (true) {
int read = is.read(responseData, 0, responseData.length - readCount);
if (read <= 0) {
break;
}
readCount += read;
}
byte[] r = new byte[readCount];
for(int i = 0 ; i < readCount; i ++){
r[i] = responseData[i];
}
// responseData缓冲预留太长,r是最终的byte[]


2、遇到与server交互,既需要TCP输入也需要TCP输出的交互情景,inputstream和outputstream一定要同时获取到

outputstream要flush后才能获得服务器响应。

Socket socket = new Socket(ip, port);
OutputStream os = socket.getOutputStream();
InputStream is = socket.getInputStream();

logger.info("开始输出socket...");
os.write(content);
os.flush();
logger.info("开始读入socket...");
byte[] responseData = new byte[Integer.parseInt(ConfigLoader.getInstance().getValue("app.maxbuffer"))];
int readCount = 0;
while (true) {
int read = is.read(responseData, 0, responseData.length - readCount);
if (read <= 0) {
break;
}
readCount += read;
}
byte[] r = new byte[readCount];
for(int i = 0 ; i < readCount; i ++){
r[i] = responseData[i];
}
logger.info("关闭socket...");
os.close();
is.close();
socket.close();
logger.info("读入socket结束,接收返回:"+new String(r));
return r;


HTTP数据封装

如果需要传递byte[],一定要经过Base64编码,同时,如果包装过返回数据结构,并通过fastJson或者其他序列化工具处理的话,目前由于为止原因,fastJson无法处理
HashMap<String, byte[]>
这种类型,所以本文程序我只通过HTTP传输Base64处理后的String。

效果

客户端

zabbix_agent请求被桥接器监听到->通过HTTP发送->接收到HTTP返回->将返回数据通过TCP发送回zabbix_agent



服务端

桥接器接到HTTP请求->转为TCP数据发送给zabbix_server->接收到zabbix_server响应->通过HTTP发回响应



zabbix_server监控页面效果

成功在server端注册agent,并上传了一个监控数据。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  tcp http zabbix 桥接 转发