您的位置:首页 > 编程语言 > Java开发

elasticsearch java 基于 RestClientAPI 的增删改查

2017-12-11 10:56 465 查看
RestClient 类客户端 5种请求方式
public class EsRestClient {

private static final String PUT = "PUT";
private static final String POST = "POST";
private static final String GET = "GET";
private static final String HEAD = "HEAD";
private static final String DELETE = "DELETE";

public static RestClient getClient(){
return getClient(CommonResource.cloud_es_dm_ip, CommonResource.cloud_es_dm_rest_port);
}

public static RestClient getClient(String ip, Integer port){
return RestClient.builder(new HttpHost(ip,port)).setMaxRetryTimeoutMillis(6000).build();
}

public static String sendGet(String ip,Integer port,String index,String type,String query){
RestClient restClient = getClient(ip,port);
String  rs = null;
try {
HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON);
String endpoint = "/"+index+"/"+type+"/_search";
if(StringUtils.isBlank(type)){
endpoint = "/"+index+"/_search";
}
Response response = restClient.performRequest(GET, endpoint, Collections.singletonMap("pretty", "true"),entity);
rs = EntityUtils.toString(response.getEntity());
} catch (IOException e) {
e.printStackTrace();
}finally {
close(restClient);
}
return rs;
}

public static String sendPost(List<String> indexs,List<String> types,String query){
RestClient restClient = getClient();
String  rs = null;
try [/b]{
String index = StringUtils.join(indexs, ",");
String type = "/";
if(Objects.nonNull(types) && !types.isEmpty()){
type += StringUtils.join(types, ",")+"/";
}
HttpEntity entity = new NStringEntity(query, ContentType.APPLICATION_JSON);
String endpoint = "/"+index+type+"_search";
Response response = restClient.performRequest(POST, endpoint, Collections.singletonMap("pretty", "true"),entity);
rs = EntityUtils.toString(response.getEntity());
} catch (IOException e) {
e.printStackTrace();
}finally {
close(restClient);
}
return rs;
}

public static String sendPut(String ip, int port,String index,String type,String id,String data){
RestClient restClient = getClient(ip,port);
String  rs = null;
try {
HttpEntity entity = new NStringEntity(data, ContentType.APPLICATION_JSON);
String requestType = POST;
String endpoint = "/"+index+"/"+type;
if(StringUtils.isNoneBlank(id)){
requestType = PUT;
endpoint = "/"+index+"/"+type+"/"+id;
}
Response response = restClient.performRequest(requestType, endpoint, Collections.singletonMap("pretty", "true"),entity);
rs = EntityUtils.toString(response.getEntity());
} catch (IOException e) {
e.printStackTrace();
}finally {
close(restClient);
}
return rs;
}

public static String sendDelete(RestClient restClient,String index,String type){
return sendDelete(restClient,index,type,null);
}

public static String sendDelete(RestClient restClient,String index,String type,String id){

String rs = null;
try {
String endpoint = "/"+index+"/"+type+"/"+id;
if(StringUtils.isBlank(id)){
endpoint = "/"+index+"/"+type;
}else if(StringUtils.isBlank(type)){
endpoint = "/"+index;
}
Response response = restClient.performRequest(DELETE, endpoint);
rs = EntityUtils.toString(response.getEntity());
} catch (IOException e) {
e.printStackTrace();
}finally {
close(restClient);
}
return rs;
}

public static boolean sendHead(RestClient restClient,String index,String type){
int code = 200;
try {
String endpoint = "/"+index+"/"+type;
//       String endpoint = "/"+index+"/_mapping/"+type;//5.x
if(StringUtils.isBlank(type)){
endpoint = "/"+index;
}
Response response = restClient.performRequest(HEAD, endpoint);//200存在,404不存在
code = response.getStatusLine().getStatusCode();
} catch (IOException e) {
e.printStackTrace();
}finally {
close(restClient);
}
return code == 200?true:false;
}

public static void close(RestClient restClient){
if(Objects.nonNull(restClient)){
try {
restClient.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
}
RestClient service类
@Servicepublic class MyDataElasticSearchDriverImpl implements IMyDataDriver {private static final Logger log = Logger.getLogger(MyDataElasticSearchDriverImpl.class);@Overridepublic List<String> getTables(String ip, Integer port, String username,String password, String database)  {List<String> tables = Lists.newArrayList();RestClient restClient = EsRestClient.getClient(ip,port);Map<String,Object> requestData = Maps.newHashMap();requestData.put("from", 1);requestData.put("size", 100);HttpEntity entity = new NStringEntity(JSON.toJSONString(requestData), ContentType.APPLICATION_JSON);Response response;JSONObject jsonObj = null;try {response = restClient.performRequest("GET", "/"+database, Collections.singletonMap("pretty", "true"),entity);jsonObj = JSON.parseObject(EntityUtils.toString(response.getEntity()));} catch (IOException e) {e.printStackTrace();}@SuppressWarnings("unchecked")Map<String,Object> mappings = (Map<String,Object>)(jsonObj.getJSONObject(database).get("mappings"));for (Map.Entry<String,Object> m : mappings.entrySet()) {tables.add(m.getKey());}return tables;}@Overridepublic List<String> getDatabases(String ip, Integer port, String username,String password)  {List<String> dataBases = Lists.newArrayList();RestClient restClient = EsRestClient.getClient(ip, port);try {Response response = restClient.performRequest("GET", "/_cat/indices?v", Collections.singletonMap("pretty", "true"));String rs = EntityUtils.toString(response.getEntity());BufferedReader br = new BufferedReader(new InputStreamReader(new ByteArrayInputStream(rs.getBytes(Charset.forName("utf8"))), Charset.forName("utf8")));String line = null;int index = 0;while((line = br.readLine()) != null){if(index > 0){List<String> indices = Lists.newLinkedList();String[] data = line.split(" ");int a=0;for (String s : data) {if(!"".equals(s)){a++;}if(a==3) {dataBases.add(s);a=0;break;}}}index++;}br.close();} catch (IOException e) {e.printStackTrace();}EsRestClient.close(restClient);return dataBases;}@Overridepublic List<String> getFields(String ip, Integer port, String username,String password, String database, String table)  {List<String> fields = Lists.newArrayList();RestClient restClient = EsRestClient.getClient(ip, port);try {Response response = restClient.performRequest("GET", "/"+database+"/_mapping/"+table, Collections.singletonMap("pretty", "true"));String rs = EntityUtils.toString(response.getEntity());JSONObject jsonObj = JSON.parseObject(rs);@SuppressWarnings("unchecked")Map<String,Object> properties = (Map<String,Object>)(jsonObj.getJSONObject(database).getJSONObject("mappings").getJSONObject(table).get("properties"));for (Map.Entry<String,Object> m : properties.entrySet()) {fields.add(m.getKey());}} catch (IOException e) {e.printStackTrace();}EsRestClient.close(restClient);return fields;}@Overridepublic RestData<Map<String, Object>> getData(String ip, Integer port,String username, String password, String database, String table,int start, int offset)  {List<Map<String,Object>> rsList =new ArrayList<>();long rsCount = 0l;Map<String,Object> requestData = Maps.newHashMap();requestData.put("from", start);requestData.put("size", offset);String rs = EsRestClient.sendGet(ip,port,database, table, JSON.toJSONString(requestData));try {JSONObject jsonObj = JSON.parseObject(rs);JSONObject hits = jsonObj.getJSONObject("hits");JSONArray arr = hits.getJSONArray("hits");if(Objects.nonNull(arr)){rsCount = hits.getLong("total");if(rsCount > 0){for (int i = 0;i < arr.size();i++) {String _id = arr.getJSONObject(i).getString("_id");JSONObject obj = arr.getJSONObject(i).getJSONObject("_source");obj.put("_id", _id);Map<String,Object> map = (Map<String,Object>)(obj);rsList.add(map);}}}} catch (Exception e) {log.error(e);}return new RestData<>(rsList,rsCount);}@Overridepublic RestData<Map<String,Object>> searchData(String ip, Integer port, String username, String password, List<String> databases, List<String> tables, List<String> fields, String query,String sort, int start, int offset) {Map<String,Object> requestData = Maps.newHashMap();if(Objects.nonNull(fields) && fields.size()>0){requestData.put("_source", fields);}if(StringUtils.isNoneBlank(query)){requestData.put("query",JSON.parse(query));requestData.put("from", start);requestData.put("size", offset);}if(StringUtils.isNoneBlank(sort)){requestData.put("sort",JSON.parse(sort));}String rs = EsRestClient.sendPost(databases, tables, JSON.toJSONString(requestData));JSONObject jsonObj = JSON.parseObject(rs);JSONObject hits = jsonObj.getJSONObject("hits");JSONArray arr = hits.getJSONArray("hits");List<Map<String,Object>> rsList = new ArrayList<>(offset);long rsCount = 0;if(Objects.nonNull(arr)){rsCount = hits.getLong("total");if(rsCount > 0){for (int i = 0;i < arr.size();i++) {Map<String,Object> map = (Map<String,Object>)(arr.getJSONObject(i).getJSONObject("_source"));map.put("_id", arr.getJSONObject(i).getString("_id"));rsList.add(map);}}}return new RestData<>(rsList,rsCount);}@Overridepublic StatDataBean statDimension1(String ip, Integer port, String username, String password, List<String> databases, List<String> tables,String aggs,String query, Integer returnType, Integer pageSize)  {StatDataBean rs = new StatDataBean();List<String> xdata = new ArrayList<>();List<Double> sdata = new ArrayList<>();Map<String,Object> requestData = Maps.newHashMap();try {requestData.put("aggs", JSON.parse(aggs));requestData.put("size", 0);if(query!=null && !query.equals("")){requestData.put("query", JSON.parse(query));}String rsData = EsRestClient.sendPost(databases, tables, JSON.toJSONString(requestData));JSONObject jsonObj = JSON.parseObject(rsData);JSONObject byKey = jsonObj.getJSONObject("aggregations").getJSONObject("colors");JSONArray arr = byKey.getJSONArray("buckets");if(Objects.nonNull(arr)){for (int i = 0;i < arr.size();i++) {JSONObject obj = arr.getJSONObject(i);String k = obj.getString("key_as_string");k = Objects.isNull(k)?obj.getString("key"):k;long v = obj.getLong("doc_count");if(returnType== StatDataBean.returnType.KV){rs.addKVData2Series(k, v);}else{xdata.add(k);sdata.add(Double.valueOf(v));}}}} catch (Exception e) {throw e;}rs.addData2X(xdata);rs.addData2Series("",sdata);return rs;}public StatDataBean statDimension2(String ip, Integer port, String username, String password, List<String> databases, List<String> tables,String aggs, String query, Integer returnType, Integer pageSize)  {StatDataBean rs = new StatDataBean();Table<String, String, Double> rsTable = HashBasedTable.create();Map<String,Object> requestData = Maps.newHashMap();try {requestData.put("aggs", JSON.parse(aggs));requestData.put("size", 0);if(query!=null && !query.equals("")){requestData.put("query", JSON.parse(query));}String rsData = EsRestClient.sendPost(databases, tables, JSON.toJSONString(requestData));JSONObject jsonObj = JSON.parseObject(rsData);JSONObject byKey = jsonObj.getJSONObject("aggregations").getJSONObject("by_key1");JSONArray arr = byKey.getJSONArray("buckets");if(Objects.nonNull(arr)){for (int i = 0;i < arr.size();i++) {JSONObject obj1 = arr.getJSONObject(i);String k1 = obj1.getString("key_as_string");k1 = Objects.isNull(k1)?obj1.getString("key"):k1;JSONObject byKey2 = obj1.getJSONObject("by_key2");JSONArray arr2 =  byKey2.getJSONArray("buckets");for (int j = 0;j < arr2.size();j++) {JSONObject obj2 = arr2.getJSONObject(j);String k2 = obj2.getString("key_as_string");k2 = Objects.isNull(k2)?obj2.getString("key"):k2;long v = obj2.getLong("doc_count");rsTable.put(k1, k2, Double.valueOf(v));}}}List<String> xdata = new ArrayList<>();xdata.addAll(rsTable.rowKeySet());rs.addData2X(xdata);for(String k2 : rsTable.columnKeySet()){List<Double> sdata = new ArrayList<>();for(String k1 : xdata){Double v = rsTable.get(k1, k2);v = Objects.isNull(v)?0.0:v;sdata.add(v);}rs.addData2Series(k2,sdata);}} catch (Exception e) {throw e;}return rs;}/* public boolean checkIndexIsExist(String ip, int port, String database){return checkTypeIsExist(ip, port, database, null);}*/public boolean checkTypeIsExist(RestClient restClient, String database, String table){return EsRestClient.sendHead(restClient, database, table);}@Overridepublic int createTable(String ip, Integer port, String database, String table,List<MyDataTableCreateBean> colList)  {try{RestClient restClient = EsRestClient.getClient(ip, port);if(checkTypeIsExist(restClient, database, table))return 0;Map<String,Object> type = Maps.newHashMap();Map<String,Object> properties = Maps.newHashMap();Map<String,Object> fields = Maps.newHashMap();for(MyDataTableCreateBean col : colList){Map<String,Object> field = Maps.newHashMap();//字段类型switch(col.getType()){case MyDataTableCreateBean.Type.Integer:field.put("type", "integer");break;case MyDataTableCreateBean.Type.Long:field.put("type", "long");break;case MyDataTableCreateBean.Type.Date:field.put("type", "date");field.put("format", "yyyyMMdd || yyyy-MM-dd HH:mm:ss");break;default :field.put("type", "text");//                field.put("type", "text");break;}if(col.getIsIndex()>0){//field.put("index_analyzer", "ik");//             field.put("analyzer", "ik_max_word");}else{field.put("index", "not_analyzed");}fields.put(col.getField(), field);}properties.put("properties", fields);type.put(table, properties);String mappings = "{\"mappings\":"+JSON.toJSONString(type)+"}";String endPoint =  "/"+database;if(EsRestClient.sendHead(restClient, database, "")){//已存在mappings = JSON.toJSONString(properties);endPoint = "/"+database+"/_mapping/"+table;}HttpEntity entity = new NStringEntity(mappings, ContentType.APPLICATION_JSON);Response response = restClient.performRequest("PUT", endPoint, Collections.singletonMap("pretty", "true"),entity);JSONObject jsonObj = JSON.parseObject(EntityUtils.toString(response.getEntity()));if(jsonObj.getBooleanValue("acknowledged")){return 1;}EsRestClient.close(restClient);} catch (Exception e) {log.error(e);return 0;}return 1;}@Overridepublic int dropTable(RestClient restClient, String database, String table){int flag = 0;try {if(checkTypeIsExist(restClient, database, table)){//索引或type存在String rs = EsRestClient.sendDelete(restClient,database,table);JSONObject jsonObj = JSON.parseObject(rs);if(jsonObj.getBooleanValue("acknowledged")){flag = 1;}}} catch (Exception e) {log.error(e);return flag;}return flag;}@Overridepublic void dropData(RestClient restClient, String database, String table, String id) {EsRestClient.sendDelete(restClient,database,table,id);}@Overridepublic void addData(String ip, int port, String database, String table,String objId, String data) {EsRestClient.sendPut(ip, port, database, table, objId, data);}}
测试
public Boolean addMapping2(String index, String type,List<MyDataTableCreateBean> colList) {String es_ip = CommonResource.cloud_es_dm_ip;int es_port = CommonResource.cloud_es_dm_tcp_port;try{//this.myDataElasticSearchDriver.createTable(es_ip, CommonResource.cloud_es_dm_rest_port, index, type, colList);List<String> databases=new ArrayList<>();databases.add("cars");List<String> tables=new ArrayList<>();tables.add("transactions");List<String> fields=new ArrayList<>();fields.add("title");fields.add("cooperationWay");fields.add("type");String query="{\"match\":{\"title\":\"\"}}";String aggs="{\"colors\":{\"terms\":{ \"field\": \"color\"}}}";Integer returnType=0;Integer pageSize=0;int start=0;int offset=100;//RestData<Map<String, Object>> mapRestData = this.myDataElasticSearchDriver.searchData(es_ip, CommonResource.cloud_es_dm_rest_port, index, type, databases, tables,fields,query,sort,start,offset);StatDataBean statDataBean = this.myDataElasticSearchDriver.statDimension1(es_ip, CommonResource.cloud_es_dm_rest_port, index, type, databases, tables, aggs, "", returnType, pageSize);for(String s :tables)System.out.println(s);return true;}catch (Exception e){return false;}}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: