elasticsearch java 基于 RestClientAPI 的增删改查
2017-12-11 10:56
465 查看
RestClient 类客户端 5种请求方式
public class EsRestClient { private static final String PUT = "PUT"; private static final String POST = "POST"; private static final String GET = "GET"; private static final String HEAD = "HEAD"; private static final String DELETE = "DELETE"; public static RestClient getClient(){ return getClient(CommonResource.cloud_es_dm_ip, CommonResource.cloud_es_dm_rest_port); } public static RestClient getClient(String ip, Integer port){ return RestClient.builder(new HttpHost(ip,port)).setMaxRetryTimeoutMillis(6000).build(); } public static String sendGet(String ip,Integer port,String index,String type,String query){ RestClient restClient = getClient(ip,port); String rs = null; try { HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON); String endpoint = "/"+index+"/"+type+"/_search"; if(StringUtils.isBlank(type)){ endpoint = "/"+index+"/_search"; } Response response = restClient.performRequest(GET, endpoint, Collections.singletonMap("pretty", "true"),entity); rs = EntityUtils.toString(response.getEntity()); } catch (IOException e) { e.printStackTrace(); }finally { close(restClient); } return rs; } public static String sendPost(List<String> indexs,List<String> types,String query){ RestClient restClient = getClient(); String rs = null; try [/b]{ String index = StringUtils.join(indexs, ","); String type = "/"; if(Objects.nonNull(types) && !types.isEmpty()){ type += StringUtils.join(types, ",")+"/"; } HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON); String endpoint = "/"+index+type+"_search"; Response response = restClient.performRequest(POST, endpoint, Collections.singletonMap("pretty", "true"),entity); rs = EntityUtils.toString(response.getEntity()); } catch (IOException e) { e.printStackTrace(); }finally { close(restClient); } return rs; } public static String sendPut(String ip, int port,String index,String type,String id,String data){ RestClient restClient = getClient(ip,port); String rs = null; try { HttpEntity entity = new NStringEntity(data, ContentType.APPLICATION_JSON); String requestType = POST; String endpoint = "/"+index+"/"+type; if(StringUtils.isNoneBlank(id)){ requestType = PUT; endpoint = "/"+index+"/"+type+"/"+id; } Response response = restClient.performRequest(requestType, endpoint, Collections.singletonMap("pretty", "true"),entity); rs = EntityUtils.toString(response.getEntity()); } catch (IOException e) { e.printStackTrace(); }finally { close(restClient); } return rs; } public static String sendDelete(RestClient restClient,String index,String type){ return sendDelete(restClient,index,type,null); } public static String sendDelete(RestClient restClient,String index,String type,String id){ String rs = null; try { String endpoint = "/"+index+"/"+type+"/"+id; if(StringUtils.isBlank(id)){ endpoint = "/"+index+"/"+type; }else if(StringUtils.isBlank(type)){ endpoint = "/"+index; } Response response = restClient.performRequest(DELETE, endpoint); rs = EntityUtils.toString(response.getEntity()); } catch (IOException e) { e.printStackTrace(); }finally { close(restClient); } return rs; } public static boolean sendHead(RestClient restClient,String index,String type){ int code = 200; try { String endpoint = "/"+index+"/"+type; // String endpoint = "/"+index+"/_mapping/"+type;//5.x if(StringUtils.isBlank(type)){ endpoint = "/"+index; } Response response = restClient.performRequest(HEAD, endpoint);//200存在,404不存在 code = response.getStatusLine().getStatusCode(); } catch (IOException e) { e.printStackTrace(); }finally { close(restClient); } return code == 200?true:false; } public static void close(RestClient restClient){ if(Objects.nonNull(restClient)){ try { restClient.close(); } catch (IOException e) { e.printStackTrace(); } } } }
RestClient service类
@Servicepublic class MyDataElasticSearchDriverImpl implements IMyDataDriver {private static final Logger log = Logger.getLogger(MyDataElasticSearchDriverImpl.class);@Overridepublic List<String> getTables(String ip, Integer port, String username,String password, String database) {List<String> tables = Lists.newArrayList();RestClient restClient = EsRestClient.getClient(ip,port);Map<String,Object> requestData = Maps.newHashMap();requestData.put("from", 1);requestData.put("size", 100);HttpEntity entity = new NStringEntity(JSON.toJSONString(requestData), ContentType.APPLICATION_JSON);Response response;JSONObject jsonObj = null;try {response = restClient.performRequest("GET", "/"+database, Collections.singletonMap("pretty", "true"),entity);jsonObj = JSON.parseObject(EntityUtils.toString(response.getEntity()));} catch (IOException e) {e.printStackTrace();}@SuppressWarnings("unchecked")Map<String,Object> mappings = (Map<String,Object>)(jsonObj.getJSONObject(database).get("mappings"));for (Map.Entry<String,Object> m : mappings.entrySet()) {tables.add(m.getKey());}return tables;}@Overridepublic List<String> getDatabases(String ip, Integer port, String username,String password) {List<String> dataBases = Lists.newArrayList();RestClient restClient = EsRestClient.getClient(ip, port);try {Response response = restClient.performRequest("GET", "/_cat/indices?v", Collections.singletonMap("pretty", "true"));String rs = EntityUtils.toString(response.getEntity());BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(rs.getBytes(Charset.forName("utf8"))), Charset.forName("utf8")));String line = null;int index = 0;while((line = br.readLine()) != null){if(index > 0){List<String> indices = Lists.newLinkedList();String[] data = line.split(" ");int a=0;for (String s : data) {if(!"".equals(s)){a++;}if(a==3) {dataBases.add(s);a=0;break;}}}index++;}br.close();} catch (IOException e) {e.printStackTrace();}EsRestClient.close(restClient);return dataBases;}@Overridepublic List<String> getFields(String ip, Integer port, String username,String password, String database, String table) {List<String> fields = Lists.newArrayList();RestClient restClient = EsRestClient.getClient(ip, port);try {Response response = restClient.performRequest("GET", "/"+database+"/_mapping/"+table, Collections.singletonMap("pretty", "true"));String rs = EntityUtils.toString(response.getEntity());JSONObject jsonObj = JSON.parseObject(rs);@SuppressWarnings("unchecked")Map<String,Object> properties = (Map<String,Object>)(jsonObj.getJSONObject(database).getJSONObject("mappings").getJSONObject(table).get("properties"));for (Map.Entry<String,Object> m : properties.entrySet()) {fields.add(m.getKey());}} catch (IOException e) {e.printStackTrace();}EsRestClient.close(restClient);return fields;}@Overridepublic RestData<Map<String, Object>> getData(String ip, Integer port,String username, String password, String database, String table,int start, int offset) {List<Map<String,Object>> rsList =new ArrayList<>();long rsCount = 0l;Map<String,Object> requestData = Maps.newHashMap();requestData.put("from", start);requestData.put("size", offset);String rs = EsRestClient.sendGet(ip,port,database, table, JSON.toJSONString(requestData));try {JSONObject jsonObj = JSON.parseObject(rs);JSONObject hits = jsonObj.getJSONObject("hits");JSONArray arr = hits.getJSONArray("hits");if(Objects.nonNull(arr)){rsCount = hits.getLong("total");if(rsCount > 0){for (int i = 0;i < arr.size();i++) {String _id = arr.getJSONObject(i).getString("_id");JSONObject obj = arr.getJSONObject(i).getJSONObject("_source");obj.put("_id", _id);Map<String,Object> map = (Map<String,Object>)(obj);rsList.add(map);}}}} catch (Exception e) {log.error(e);}return new RestData<>(rsList,rsCount);}@Overridepublic RestData<Map<String,Object>> searchData(String ip, Integer port, String username, String password, List<String> databases, List<String> tables, List<String> fields, String query,String sort, int start, int offset) {Map<String,Object> requestData = Maps.newHashMap();if(Objects.nonNull(fields) && fields.size()>0){requestData.put("_source", fields);}if(StringUtils.isNoneBlank(query)){requestData.put("query",JSON.parse(query));requestData.put("from", start);requestData.put("size", offset);}if(StringUtils.isNoneBlank(sort)){requestData.put("sort",JSON.parse(sort));}String rs = EsRestClient.sendPost(databases, tables, JSON.toJSONString(requestData));JSONObject jsonObj = JSON.parseObject(rs);JSONObject hits = jsonObj.getJSONObject("hits");JSONArray arr = hits.getJSONArray("hits");List<Map<String,Object>> rsList = new ArrayList<>(offset);long rsCount = 0;if(Objects.nonNull(arr)){rsCount = hits.getLong("total");if(rsCount > 0){for (int i = 0;i < arr.size();i++) {Map<String,Object> map = (Map<String,Object>)(arr.getJSONObject(i).getJSONObject("_source"));map.put("_id", arr.getJSONObject(i).getString("_id"));rsList.add(map);}}}return new RestData<>(rsList,rsCount);}@Overridepublic StatDataBean statDimension1(String ip, Integer port, String username, String password, List<String> databases, List<String> tables,String aggs,String query, Integer returnType, Integer pageSize) {StatDataBean rs = new StatDataBean();List<String> xdata = new ArrayList<>();List<Double> sdata = new ArrayList<>();Map<String,Object> requestData = Maps.newHashMap();try {requestData.put("aggs", JSON.parse(aggs));requestData.put("size", 0);if(query!=null && !query.equals("")){requestData.put("query", JSON.parse(query));}String rsData = EsRestClient.sendPost(databases, tables, JSON.toJSONString(requestData));JSONObject jsonObj = JSON.parseObject(rsData);JSONObject byKey = jsonObj.getJSONObject("aggregations").getJSONObject("colors");JSONArray arr = byKey.getJSONArray("buckets");if(Objects.nonNull(arr)){for (int i = 0;i < arr.size();i++) {JSONObject obj = arr.getJSONObject(i);String k = obj.getString("key_as_string");k = Objects.isNull(k)?obj.getString("key"):k;long v = obj.getLong("doc_count");if(returnType== StatDataBean.returnType.KV){rs.addKVData2Series(k, v);}else{xdata.add(k);sdata.add(Double.valueOf(v));}}}} catch (Exception e) {throw e;}rs.addData2X(xdata);rs.addData2Series("",sdata);return rs;}public StatDataBean statDimension2(String ip, Integer port, String username, String password, List<String> databases, List<String> tables,String aggs, String query, Integer returnType, Integer pageSize) {StatDataBean rs = new StatDataBean();Table<String, String, Double> rsTable = HashBasedTable.create();Map<String,Object> requestData = Maps.newHashMap();try {requestData.put("aggs", JSON.parse(aggs));requestData.put("size", 0);if(query!=null && !query.equals("")){requestData.put("query", JSON.parse(query));}String rsData = EsRestClient.sendPost(databases, tables, JSON.toJSONString(requestData));JSONObject jsonObj = JSON.parseObject(rsData);JSONObject byKey = jsonObj.getJSONObject("aggregations").getJSONObject("by_key1");JSONArray arr = byKey.getJSONArray("buckets");if(Objects.nonNull(arr)){for (int i = 0;i < arr.size();i++) {JSONObject obj1 = arr.getJSONObject(i);String k1 = obj1.getString("key_as_string");k1 = Objects.isNull(k1)?obj1.getString("key"):k1;JSONObject byKey2 = obj1.getJSONObject("by_key2");JSONArray arr2 = byKey2.getJSONArray("buckets");for (int j = 0;j < arr2.size();j++) {JSONObject obj2 = arr2.getJSONObject(j);String k2 = obj2.getString("key_as_string");k2 = Objects.isNull(k2)?obj2.getString("key"):k2;long v = obj2.getLong("doc_count");rsTable.put(k1, k2, Double.valueOf(v));}}}List<String> xdata = new ArrayList<>();xdata.addAll(rsTable.rowKeySet());rs.addData2X(xdata);for(String k2 : rsTable.columnKeySet()){List<Double> sdata = new ArrayList<>();for(String k1 : xdata){Double v = rsTable.get(k1, k2);v = Objects.isNull(v)?0.0:v;sdata.add(v);}rs.addData2Series(k2,sdata);}} catch (Exception e) {throw e;}return rs;}/* public boolean checkIndexIsExist(String ip, int port, String database){return checkTypeIsExist(ip, port, database, null);}*/public boolean checkTypeIsExist(RestClient restClient, String database, String table){return EsRestClient.sendHead(restClient, database, table);}@Overridepublic int createTable(String ip, Integer port, String database, String table,List<MyDataTableCreateBean> colList) {try{RestClient restClient = EsRestClient.getClient(ip, port);if(checkTypeIsExist(restClient, database, table))return 0;Map<String,Object> type = Maps.newHashMap();Map<String,Object> properties = Maps.newHashMap();Map<String,Object> fields = Maps.newHashMap();for(MyDataTableCreateBean col : colList){Map<String,Object> field = Maps.newHashMap();//字段类型switch(col.getType()){case MyDataTableCreateBean.Type.Integer:field.put("type", "integer");break;case MyDataTableCreateBean.Type.Long:field.put("type", "long");break;case MyDataTableCreateBean.Type.Date:field.put("type", "date");field.put("format", "yyyyMMdd || yyyy-MM-dd HH:mm:ss");break;default :field.put("type", "text");// field.put("type", "text");break;}if(col.getIsIndex()>0){//field.put("index_analyzer", "ik");// field.put("analyzer", "ik_max_word");}else{field.put("index", "not_analyzed");}fields.put(col.getField(), field);}properties.put("properties", fields);type.put(table, properties);String mappings = "{\"mappings\":"+JSON.toJSONString(type)+"}";String endPoint = "/"+database;if(EsRestClient.sendHead(restClient, database, "")){//已存在mappings = JSON.toJSONString(properties);endPoint = "/"+database+"/_mapping/"+table;}HttpEntity entity = new NStringEntity(mappings, ContentType.APPLICATION_JSON);Response response = restClient.performRequest("PUT", endPoint, Collections.singletonMap("pretty", "true"),entity);JSONObject jsonObj = JSON.parseObject(EntityUtils.toString(response.getEntity()));if(jsonObj.getBooleanValue("acknowledged")){return 1;}EsRestClient.close(restClient);} catch (Exception e) {log.error(e);return 0;}return 1;}@Overridepublic int dropTable(RestClient restClient, String database, String table){int flag = 0;try {if(checkTypeIsExist(restClient, database, table)){//索引或type存在String rs = EsRestClient.sendDelete(restClient,database,table);JSONObject jsonObj = JSON.parseObject(rs);if(jsonObj.getBooleanValue("acknowledged")){flag = 1;}}} catch (Exception e) {log.error(e);return flag;}return flag;}@Overridepublic void dropData(RestClient restClient, String database, String table, String id) {EsRestClient.sendDelete(restClient,database,table,id);}@Overridepublic void addData(String ip, int port, String database, String table,String objId, String data) {EsRestClient.sendPut(ip, port, database, table, objId, data);}}测试
public Boolean addMapping2(String index, String type,List<MyDataTableCreateBean> colList) {String es_ip = CommonResource.cloud_es_dm_ip;int es_port = CommonResource.cloud_es_dm_tcp_port;try{//this.myDataElasticSearchDriver.createTable(es_ip, CommonResource.cloud_es_dm_rest_port, index, type, colList);List<String> databases=new ArrayList<>();databases.add("cars");List<String> tables=new ArrayList<>();tables.add("transactions");List<String> fields=new ArrayList<>();fields.add("title");fields.add("cooperationWay");fields.add("type");String query="{\"match\":{\"title\":\"技\"}}";String aggs="{\"colors\":{\"terms\":{ \"field\": \"color\"}}}";Integer returnType=0;Integer pageSize=0;int start=0;int offset=100;//RestData<Map<String, Object>> mapRestData = this.myDataElasticSearchDriver.searchData(es_ip, CommonResource.cloud_es_dm_rest_port, index, type, databases, tables,fields,query,sort,start,offset);StatDataBean statDataBean = this.myDataElasticSearchDriver.statDimension1(es_ip, CommonResource.cloud_es_dm_rest_port, index, type, databases, tables, aggs, "", returnType, pageSize);for(String s :tables)System.out.println(s);return true;}catch (Exception e){return false;}}
相关文章推荐
- elasticsearch java 基于TransportClient API的增删改查
- 【异常】elasticsearch的RestClientAPI请求超时问题:java.lang.RuntimeException: error while performing request
- Java中REST API使用示例——基于云平台+云服务打造自己的在线翻译工具
- ElasticSearch5.4.3 环境搭建 2017 (5-Java Client Security Api x-pack)
- JIRA REST java client API实际应用
- elasticsearch 5.0 获取 TransportClient 操作客户端java API
- Elasticsearch系列(七)----JAVA客户端之RestClient操作详解
- Elasticsearch Java Rest Client API 整理总结 (一)
- Java - 在WebService中使用Client调用三方的RestAPI
- ElasticSearch的javaAPI之Client
- 使用Java Rest Client操作Elasticsearch
- ElasticSearch java API--创建Client连接
- 使用Java Rest Client操作Elasticsearch
- elasticsearch5.6.4 RestClient 索引操作API详细的注释给力篇值得拥有
- elasticsearch入门教程一(基于JAVA client 针对5.1版本)
- ElasticSearch6.1的增删改查Rest API
- ElasticSearch的javaAPI之Client
- JIRA Rest JAVA Client API实现问题管理及自定义字段(原创)
- 基于Elasticsearch2.1.1的JavaAPI基本操作代码示例
- 提供一个wink restclient用get访问restapi的例子(java语言)