kafka rest api方式写入——java程序
2018-01-23 14:08
495 查看
kafka rest api方式写入——java程序
本文主要介绍通过rest api方的方式写入kafka本文前提是搭建一个kafka的proxy,然后启动它。
下面上货。
private static String startPost(String path, List<LogBlock> logBlockList) { if (path.equals("") || logBlockList == null || logBlockList.size() == 0) { return ""; } OutputStreamWriter out = null; BufferedReader in = null; StringBuilder result = new StringBuilder(); try { URL url = new URL(path); HttpURLConnection conn = (HttpURLConnection) url.openConnection(); conn.setRequestMethod("POST"); // 发送POST请求必须设置为true conn.setDoOutput(true); conn.setDoInput(true); //设置连接超时时间和读取超时时间 conn.setConnectTimeout(10000); conn.setReadTimeout(10000); conn.setRequestProperty("Content-Type", "application/vnd.kafka.json.v2+json"); conn.setRequestProperty("Accept", "application/vnd.kafka.v2+json, application/vnd.kafka+json, application/json"); out = new OutputStreamWriter(conn.getOutputStream()); StringBuffer sb = new StringBuffer(); sb.append("{").append("\"records\": ["); for (int i = 0; i < logBlockList.size(); i++) { if (i == 0) { sb.append("{").append("\"value\": ").append(logBlockList.get(i)).append("}"); } else { sb.append(",{").append("\"value\": ").append(logBlockList.get(i)).append("}"); } } sb.append("]").append("}"); out.append(sb.toString()); out.flush(); out.close(); // 取得输入流,并使用Reader读取 in = new BufferedReader(new InputStreamReader(conn.getInputStream(), "UTF-8")); String line; while ((line = in.readLine()) != null) { result.append(line); } } catch (Exception e) { e.printStackTrace(); } //关闭输出流、输入流 finally { try { if (out != null) { out.close(); } if (in != null) { in.close(); } } catch (IOException ex) { ex.printStackTrace(); } } return result.toString(); }
程序调用的时候这样:
startPost("http://192.168.0.66:48082/topics/test", list);
这样就可以实现通过rest api方式批量写入kafka了。
相关文章推荐
- 在Java程序中调用Salesforce REST API
- java hbase api 批量高效写入数据(线程池方式)
- 如何为java程序创建快捷方式
- java编写TCP方式的通信程序
- jax-rs(Java API for RESTful Web Services) 实践教程 之四 —— @Context注入HttpServletRequest 使REST保持状态!
- 关于java程序的运行方式.
- 改进Java中的对象管理方式,提高程序性能
- 使用WIN32 API CreateProcess()以无窗口方式创建DOS程序
- opensession()和getCurrentSession()方法的区别(JTA(java Transaction Api,分布式事务)事务和Connection事务:数据库自带的事务处理方式)
- 字符串处理是许多程序中非常重要的一部分,它们可以用于文本显示,数据表示,查找键和很多目的.在Unix下,用户可以使用正则表达式的强健功能实现这些 目的,从Java1.4起,Java核心API就引入了java.util.regex程序包,它是一种有价值的基础
- java application应用程序 使用JDBC和proxool两种方式连接数据库 的测试程序代码
- java Swing中弹出对话框的几种方式与java 对话框 JOptionPane类的api
- java程序通过密钥方式使用JSch API访问SSH(转帖)
- jax-rs(Java API for RESTful Web Services) 实践教程 之五 —— 注入全局变量 和 rest的生命周期
- java编写TCP方式的通信程序
- Java5增加了新的类库并发集java.util.concurrent,该类库为并发程序提供了丰富的API
- java程序用post方式给某一网页传递参数
- 选择运行 Java 程序的方式
- 使用WIN32 API CreateProcess()以无窗口方式创建DOS程序
- java编写TCP方式的通信程序