您的位置:首页 > 编程语言 > Java开发

kafka rest api方式写入——java程序

2018-01-23 14:08 495 查看

kafka rest api方式写入——java程序

本文主要介绍通过rest api方的方式写入kafka

本文前提是搭建一个kafka的proxy,然后启动它。

下面上货。
private static String startPost(String path, List<LogBlock> logBlockList) {
if (path.equals("") || logBlockList == null || logBlockList.size() == 0) {
return "";
}
OutputStreamWriter out = null;
BufferedReader in = null;
StringBuilder result = new StringBuilder();
try {
URL url = new URL(path);
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
conn.setRequestMethod("POST");
// 发送POST请求必须设置为true
conn.setDoOutput(true);
conn.setDoInput(true);
//设置连接超时时间和读取超时时间
conn.setConnectTimeout(10000);
conn.setReadTimeout(10000);
conn.setRequestProperty("Content-Type", "application/vnd.kafka.json.v2+json");
conn.setRequestProperty("Accept", "application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json");

out = new OutputStreamWriter(conn.getOutputStream());
StringBuffer sb = new StringBuffer();
sb.append("{").append("\"records\": [");
for (int i = 0; i < logBlockList.size(); i++) {
if (i == 0) {
sb.append("{").append("\"value\": ").append(logBlockList.get(i)).append("}");
} else {
sb.append(",{").append("\"value\": ").append(logBlockList.get(i)).append("}");
}
}
sb.append("]").append("}");
out.append(sb.toString());
out.flush();
out.close();
// 取得输入流,并使用Reader读取
in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "UTF-8"));
String line;
while ((line = in.readLine()) != null) {
result.append(line);
}

} catch (Exception e) {
e.printStackTrace();
}
//关闭输出流、输入流
finally {
try {
if (out != null) {
out.close();
}
if (in != null) {
in.close();
}
} catch (IOException ex) {
ex.printStackTrace();
}
}
return result.toString();
}


程序调用的时候这样:
startPost("http://192.168.0.66:48082/topics/test", list);


这样就可以实现通过rest api方式批量写入kafka了。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐