您的位置:首页 > 其它

对ElasticSearch读写操作的封装

2016-06-30 12:43 351 查看
1 依赖的jar包:

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>1.7.1</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.4</version>
</dependency>

2 Es环境配置:elasticsearch.yml

cluster.name: ${es.cluster.name}
node.name: "${es.node.name}"

client.transport.ip: ${es.client.transport.ip}
client.transport.port: ${es.client.transport.port}

3 封装的客户端工具类 ESClient

import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequestBuilder;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexRequestBuilder;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.ImmutableSettings;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.*;

public class ESClient {

private static final Logger log = LoggerFactory.getLogger(ESClient.class);

private TransportClient client;

public ESClient() {
this.init();
}

private void close() {
if (this.client != null) {
this.client.close();
}
}

@Override
public void finalize() throws Throwable {
this.close();
super.finalize();
}

private void init() {
try {
Settings settings = ImmutableSettings.settingsBuilder().loadFromClasspath("elasticsearch.yml")
.put("client.transport.sniff", true).build();
this.client = new TransportClient(settings);

int port = settings.getAsInt("client.transport.port", 9900);
String[] ips = settings.getAsArray("client.transport.ip");

for (String ip : ips) {
log.info("the ip is:" + ip);
client.addTransportAddress(new InetSocketTransportAddress(ip, port));
}
log.info("es连接成功:{},{}", client, JSONObject.toJSONString(client.listedNodes()));

} catch (Exception e) {
if (client != null) {
client.close();
}
log.error("连接es失败!", e);
}
}

/**
* 为一份文档建立索引
*
* @param index 索引名,相当于关系型数据库的库名
* @param type  文档类型,相当于关系型数据库的表名
* @param json  json格式的数据集,必须含有属性"id"
* @return
*/
public IndexResponse indexDoc(String index, String type, String json) throws Exception {
JSONObject kvMap = JSONObject.parseObject(json);
return this.indexDoc(index, type, kvMap);
}

/**
* 为一份文档建立索引
*
* @param index 索引名,相当于关系型数据库的库名
* @param type  文档类型,相当于关系型数据库的表名
* @param kvMap 键值对形式的数据集,map中必须有属性key: "id"
* @return
*/
public IndexResponse indexDoc(String index, String type, Map<String, Object> kvMap)
throws Exception {
if (!kvMap.containsKey("id")) {
throw new Exception("创建索引时,传入的map或json串中没有属性'id'! ");
}
String id = (String) kvMap.get("id");
if (id == null) {
throw new Exception("创建索引时,传入的map或json的属性'id'的值为null! ");
}

IndexRequestBuilder builder = client.prepareIndex(index, type, id);
IndexResponse response = builder.setSource(kvMap)
.execute()
.actionGet();
return response;
}

/**
* 为多份文档建立索引
*
* @param index 索引名,相当于关系型数据库的库名
* @param type  文档类型,相当于关系型数据库的表名
* @param jsons json格式的数据集,其下json串必须有属性"id"
* @return
*/
public BulkResponse batchIndexDocsForJson(String index, String type, List<String> jsons)
throws Exception {
if (jsons.isEmpty()) {
throw new Exception("批量创建索引时,传入的参数'jsons'为空!");
}

List<Map<String, Object>> kvList = new ArrayList<Map<String, Object>>(jsons.size());
for (String json : jsons) {
JSONObject kvMap = JSONObject.parseObject(json);
kvList.add(kvMap);
}

BulkResponse response = this.batchIndexDocsForMap(index, type, kvList);
kvList.clear();
return response;
}

/**
* 为多份文档建立索引
*
* @param index  索引名,相当于关系型数据库的库名
* @param type   文档类型,相当于关系型数据库的表名
* @param kvList 键值对形式的数据集,其下map中必须有属性key: "id"
* @return
*/
public BulkResponse batchIndexDocsForMap(String index, String type, List<Map<String, Object>> kvList)
throws Exception {
if (kvList.isEmpty()) {
throw new Exception("批量创建索引时,传入的参数'kvList'为空!");
}

List<IndexRequest> requestList = new ArrayList<IndexRequest>(kvList.size());

for (Map<String, Object> kvMap : kvList) {
if (!kvMap.containsKey("id")) {
throw new Exception("批量创建索引时,传入的map或json串中没有属性'id'! ");
}
String id = (String) kvMap.get("id");
if (id == null) {
throw new Exception("批量创建索引时,传入的map或json的属性'id'的值为null! ");
}

IndexRequest request = client
.prepareIndex(index, type, id).setSource(kvMap)
.request();
requestList.add(request);
}

BulkRequestBuilder bulkRequest = client.prepareBulk();
for (IndexRequest request : requestList) {
bulkRequest.add(request);
}

BulkResponse response = bulkRequest
.execute()
.actionGet();

return response;
}

/**
* 删除一个文档
*
* @param index 索引名,相当于关系型数据库的库名
* @param type  文档类型,相当于关系型数据库的表名
* @param id    键值对形式的数据集
* @return
*/
public DeleteResponse deleteDoc(String index, String type, String id) throws InterruptedException {
DeleteRequestBuilder builder = client.prepareDelete(index, type, id);
DeleteResponse response = builder
.execute()
.actionGet();
return response;
}

/**
* 根据条件删除多个文档
*
* @param index        索引名,相当于关系型数据库的库名
* @param type         文档类型,相当于关系型数据库的表名
* @param queryBuilder 查询器
* @return
*/
public void deleteDocsByQuery(String index, String type, QueryBuilder queryBuilder) {
client.prepareDeleteByQuery(index).setTypes(type).setQuery(queryBuilder)
.execute()
.actionGet();
}

/**
* 指定id获取文档
*
* @param index 索引名,相当于关系型数据库的库名
* @param type  文档类型,相当于关系型数据库的表名
* @param id    文档id
* @return
*/
public Map<String, Object> getDoc(String index, String type, String id) {
GetResponse response = client.prepareGet(index, type, id)
.execute()
.actionGet();

Map<String, Object> retMap = response.getSourceAsMap();
return retMap;
}

public List<Map<String, Object>> search(String index, String type, QueryBuilder queryBuilder, FilterBuilder filterBuilder) {
SearchRequestBuilder builder = client.prepareSearch(index).setTypes(type);
if (queryBuilder != null) {
builder = builder.setQuery(queryBuilder);
}
if (filterBuilder != null) {
builder = builder.setPostFilter(filterBuilder);
}

SearchResponse searchResponse = builder.execute().actionGet();

SearchHits hits = searchResponse.getHits();
log.info("Es Hits count: " + hits.getTotalHits());

List<Map<String, Object>> kvList = new ArrayList<Map<String, Object>>();

SearchHit[] hitArray = hits.getHits();
if (hitArray.length > 0) {
for (SearchHit hit : hitArray) {
Map<String, Object> kvMap = hit.getSource();
kvMap.put("id", hit.getId());
kvList.add(kvMap);
}
}
return kvList;
}
}

4 测试类 ESClientTest

import com.alibaba.fastjson.JSONObject;
import org.elasticsearch.index.query.FilterBuilder;
import org.elasticsearch.index.query.FilterBuilders;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class ESClientTest {

private static final Logger log = LoggerFactory.getLogger(ESClientTest.class);

@Test
public void testIndexDoc() {
ESClient client = new ESClient();
try {
// 创建索引
Map<String, Object> kvMap = new HashMap<String, Object>();
kvMap.put("name", "hu");
kvMap.put("age", "30");
kvMap.put("gender", "f");
kvMap.put("id", "33sdfa");
client.indexDoc("test", "test_user", kvMap);

// 创建索引
String json = "{\"id\":\"55\",\"bb\":\"bbs\"}";
client.indexDoc("test", "test_user", json);

Thread.sleep(3000);

// 查询结果
QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
List<Map<String, Object>> kvList = client.search("test", "test_user", queryBuilder, null);
for (Map<String, Object> kv : kvList) {
System.out.println(JSONObject.toJSONString(kv));
}

} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testBatchIndexDocsForJson() {
String json1 = "{\"id\":\"1\",\"ak\":\"av\",\"bk\":\"bv\"}";
String json2 = "{\"id\":\"2\",\"ak\":\"av2\",\"bk\":\"bv2\"}";
String json3 = "{\"id\":\"3\",\"ak\":\"av3\",\"bk\":\"bv3\"}";

List<String> jsonList = new ArrayList<String>();
jsonList.add(json1);
jsonList.add(json2);
jsonList.add(json3);

ESClient client = new ESClient();
try {
// 创建索引
client.batchIndexDocsForJson("test", "test_hw", jsonList);

Thread.sleep(3000);

// 查询结果
QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
List<Map<String, Object>> kvList = client.search("test", "test_hw", queryBuilder, null);
for (Map<String, Object> kv : kvList) {
System.out.println(JSONObject.toJSONString(kv));
}

} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testBatchIndexDocsForMap() {
Map<String, Object> kvMap = new HashMap<String, Object>();
kvMap.put("id", "3");
kvMap.put("name", "hu");
kvMap.put("age", "30");
kvMap.put("gender", "f");
kvMap.put("asdf", "33");

Map<String, Object> kvMap2 = new HashMap<String, Object>();
kvMap2.put("id", "2");
kvMap2.put("name", "wang");
kvMap2.put("age", "35");
kvMap2.put("gender", "f");

List<Map<String, Object>> kvList = new ArrayList<Map<String, Object>>();
kvList.add(kvMap);
kvList.add(kvMap2);

ESClient client = new ESClient();
try {
// 创建索引
client.batchIndexDocsForMap("test", "test_user", kvList);

Thread.sleep(3000);

// 查询结果
QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
kvList = client.search("test", "test_user", queryBuilder, null);
for (Map<String, Object> kv : kvList) {
System.out.println(JSONObject.toJSONString(kv));
}

} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testSearch() {
ESClient client = new ESClient();
try {

QueryBuilder queryBuilder = QueryBuilders.termQuery("gender", "f");
FilterBuilder filterBuilder = FilterBuilders.rangeFilter("age").from(32);

List<Map<String, Object>> kvList = client.search("test", "test_user", queryBuilder, filterBuilder);
for (Map<String, Object> kv : kvList) {
System.out.println(JSONObject.toJSONString(kv));
}
} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testDeleteDoc() {
ESClient client = new ESClient();
try {

client.deleteDoc("test", "test_user", "1");

Thread.sleep(3000);

// 查询结果
QueryBuilder queryBuilder = QueryBuilders.matchAllQuery();
List<Map<String, Object>> kvList = client.search("test", "test_hw", queryBuilder, null);
for (Map<String, Object> kv : kvList) {
System.out.println(JSONObject.toJSONString(kv));
}

} catch (Exception e) {
e.printStackTrace();
}
}

@Test
public void testDeleteDocs() {
ESClient client = new ESClient();
try {
// 删除文档
QueryBuilder queryBuilder = QueryBuilders.termQuery("ak", "av2");
client.deleteDocsByQuery("test", "test_user", queryBuilder);

// 查询结果
queryBuilder = QueryBuilders.matchAllQuery();
List<Map<String, Object>> kvList = client.search("test", "test_user", queryBuilder, null);
for (Map<String, Object> kv : kvList) {
System.out.println(JSONObject.toJSONString(kv));
}

queryBuilder = QueryBuilders.matchAllQuery();
client.deleteDocsByQuery("test", "test_user", queryBuilder);

queryBuilder = QueryBuilders.matchAllQuery();
client.deleteDocsByQuery("test", "test_hw", queryBuilder);

} catch (Exception e) {
e.printStackTrace();
}
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: