对ElasticSearch读写操作的封装
2016-06-30 12:43
351 查看
1 依赖的jar包:
2 Es环境配置:elasticsearch.yml
3 封装的客户端工具类 ESClient
4 测试类 ESClientTest
<dependency> <groupId>org.elasticsearch</groupId> <artifactId>elasticsearch</artifactId> <version>1.7.1</version> </dependency> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.4</version> </dependency>
2 Es环境配置:elasticsearch.yml
cluster.name: ${es.cluster.name} node.name: "${es.node.name}" client.transport.ip: ${es.client.transport.ip} client.transport.port: ${es.client.transport.port}
3 封装的客户端工具类 ESClient
import com.alibaba.fastjson.JSONObject; import org.elasticsearch.action.bulk.BulkRequestBuilder; import org.elasticsearch.action.bulk.BulkResponse; import org.elasticsearch.action.delete.DeleteRequestBuilder; import org.elasticsearch.action.delete.DeleteResponse; import org.elasticsearch.action.get.GetResponse; import org.elasticsearch.action.index.IndexRequest; import org.elasticsearch.action.index.IndexRequestBuilder; import org.elasticsearch.action.index.IndexResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.ImmutableSettings; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.index.query.FilterBuilder; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.SearchHits; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.*; public class ESClient { private static final Logger log = LoggerFactory.getLogger(ESClient.class); private TransportClient client; public ESClient() { this.init(); } private void close() { if (this.client != null) { this.client.close(); } } @Override public void finalize() throws Throwable { this.close(); super.finalize(); } private void init() { try { Settings settings = ImmutableSettings.settingsBuilder().loadFromClasspath("elasticsearch.yml") .put("client.transport.sniff", true).build(); this.client = new TransportClient(settings); int port = settings.getAsInt("client.transport.port", 9900); String[] ips = settings.getAsArray("client.transport.ip"); for (String ip : ips) { log.info("the ip is:" + ip); client.addTransportAddress(new InetSocketTransportAddress(ip, port)); } log.info("es连接成功:{},{}", client, JSONObject.toJSONString(client.listedNodes())); } catch (Exception e) { if (client != null) { client.close(); } log.error("连接es失败!", e); } } /** * 为一份文档建立索引 * * @param index 索引名,相当于关系型数据库的库名 * @param type 文档类型,相当于关系型数据库的表名 * @param json json格式的数据集,必须含有属性"id" * @return */ public IndexResponse indexDoc(String index, String type, String json) throws Exception { JSONObject kvMap = JSONObject.parseObject(json); return this.indexDoc(index, type, kvMap); } /** * 为一份文档建立索引 * * @param index 索引名,相当于关系型数据库的库名 * @param type 文档类型,相当于关系型数据库的表名 * @param kvMap 键值对形式的数据集,map中必须有属性key: "id" * @return */ public IndexResponse indexDoc(String index, String type, Map<String, Object> kvMap) throws Exception { if (!kvMap.containsKey("id")) { throw new Exception("创建索引时,传入的map或json串中没有属性'id'! "); } String id = (String) kvMap.get("id"); if (id == null) { throw new Exception("创建索引时,传入的map或json的属性'id'的值为null! "); } IndexRequestBuilder builder = client.prepareIndex(index, type, id); IndexResponse response = builder.setSource(kvMap) .execute() .actionGet(); return response; } /** * 为多份文档建立索引 * * @param index 索引名,相当于关系型数据库的库名 * @param type 文档类型,相当于关系型数据库的表名 * @param jsons json格式的数据集,其下json串必须有属性"id" * @return */ public BulkResponse batchIndexDocsForJson(String index, String type, List<String> jsons) throws Exception { if (jsons.isEmpty()) { throw new Exception("批量创建索引时,传入的参数'jsons'为空!"); } List<Map<String, Object>> kvList = new ArrayList<Map<String, Object>>(jsons.size()); for (String json : jsons) { JSONObject kvMap = JSONObject.parseObject(json); kvList.add(kvMap); } BulkResponse response = this.batchIndexDocsForMap(index, type, kvList); kvList.clear(); return response; } /** * 为多份文档建立索引 * * @param index 索引名,相当于关系型数据库的库名 * @param type 文档类型,相当于关系型数据库的表名 * @param kvList 键值对形式的数据集,其下map中必须有属性key: "id" * @return */ public BulkResponse batchIndexDocsForMap(String index, String type, List<Map<String, Object>> kvList) throws Exception { if (kvList.isEmpty()) { throw new Exception("批量创建索引时,传入的参数'kvList'为空!"); } List<IndexRequest> requestList = new ArrayList<IndexRequest>(kvList.size()); for (Map<String, Object> kvMap : kvList) { if (!kvMap.containsKey("id")) { throw new Exception("批量创建索引时,传入的map或json串中没有属性'id'! "); } String id = (String) kvMap.get("id"); if (id == null) { throw new Exception("批量创建索引时,传入的map或json的属性'id'的值为null! "); } IndexRequest request = client .prepareIndex(index, type, id).setSource(kvMap) .request(); requestList.add(request); } BulkRequestBuilder bulkRequest = client.prepareBulk(); for (IndexRequest request : requestList) { bulkRequest.add(request); } BulkResponse response = bulkRequest .execute() .actionGet(); return response; } /** * 删除一个文档 * * @param index 索引名,相当于关系型数据库的库名 * @param type 文档类型,相当于关系型数据库的表名 * @param id 键值对形式的数据集 * @return */ public DeleteResponse deleteDoc(String index, String type, String id) throws InterruptedException { DeleteRequestBuilder builder = client.prepareDelete(index, type, id); DeleteResponse response = builder .execute() .actionGet(); return response; } /** * 根据条件删除多个文档 * * @param index 索引名,相当于关系型数据库的库名 * @param type 文档类型,相当于关系型数据库的表名 * @param queryBuilder 查询器 * @return */ public void deleteDocsByQuery(String index, String type, QueryBuilder queryBuilder) { client.prepareDeleteByQuery(index).setTypes(type).setQuery(queryBuilder) .execute() .actionGet(); } /** * 指定id获取文档 * * @param index 索引名,相当于关系型数据库的库名 * @param type 文档类型,相当于关系型数据库的表名 * @param id 文档id * @return */ public Map<String, Object> getDoc(String index, String type, String id) { GetResponse response = client.prepareGet(index, type, id) .execute() .actionGet(); Map<String, Object> retMap = response.getSourceAsMap(); return retMap; } public List<Map<String, Object>> search(String index, String type, QueryBuilder queryBuilder, FilterBuilder filterBuilder) { SearchRequestBuilder builder = client.prepareSearch(index).setTypes(type); if (queryBuilder != null) { builder = builder.setQuery(queryBuilder); } if (filterBuilder != null) { builder = builder.setPostFilter(filterBuilder); } SearchResponse searchResponse = builder.execute().actionGet(); SearchHits hits = searchResponse.getHits(); log.info("Es Hits count: " + hits.getTotalHits()); List<Map<String, Object>> kvList = new ArrayList<Map<String, Object>>(); SearchHit[] hitArray = hits.getHits(); if (hitArray.length > 0) { for (SearchHit hit : hitArray) { Map<String, Object> kvMap = hit.getSource(); kvMap.put("id", hit.getId()); kvList.add(kvMap); } } return kvList; } }
4 测试类 ESClientTest
import com.alibaba.fastjson.JSONObject; import org.elasticsearch.index.query.FilterBuilder; import org.elasticsearch.index.query.FilterBuilders; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; public class ESClientTest { private static final Logger log = LoggerFactory.getLogger(ESClientTest.class); @Test public void testIndexDoc() { ESClient client = new ESClient(); try { // 创建索引 Map<String, Object> kvMap = new HashMap<String, Object>(); kvMap.put("name", "hu"); kvMap.put("age", "30"); kvMap.put("gender", "f"); kvMap.put("id", "33sdfa"); client.indexDoc("test", "test_user", kvMap); // 创建索引 String json = "{\"id\":\"55\",\"bb\":\"bbs\"}"; client.indexDoc("test", "test_user", json); Thread.sleep(3000); // 查询结果 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); List<Map<String, Object>> kvList = client.search("test", "test_user", queryBuilder, null); for (Map<String, Object> kv : kvList) { System.out.println(JSONObject.toJSONString(kv)); } } catch (Exception e) { e.printStackTrace(); } } @Test public void testBatchIndexDocsForJson() { String json1 = "{\"id\":\"1\",\"ak\":\"av\",\"bk\":\"bv\"}"; String json2 = "{\"id\":\"2\",\"ak\":\"av2\",\"bk\":\"bv2\"}"; String json3 = "{\"id\":\"3\",\"ak\":\"av3\",\"bk\":\"bv3\"}"; List<String> jsonList = new ArrayList<String>(); jsonList.add(json1); jsonList.add(json2); jsonList.add(json3); ESClient client = new ESClient(); try { // 创建索引 client.batchIndexDocsForJson("test", "test_hw", jsonList); Thread.sleep(3000); // 查询结果 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); List<Map<String, Object>> kvList = client.search("test", "test_hw", queryBuilder, null); for (Map<String, Object> kv : kvList) { System.out.println(JSONObject.toJSONString(kv)); } } catch (Exception e) { e.printStackTrace(); } } @Test public void testBatchIndexDocsForMap() { Map<String, Object> kvMap = new HashMap<String, Object>(); kvMap.put("id", "3"); kvMap.put("name", "hu"); kvMap.put("age", "30"); kvMap.put("gender", "f"); kvMap.put("asdf", "33"); Map<String, Object> kvMap2 = new HashMap<String, Object>(); kvMap2.put("id", "2"); kvMap2.put("name", "wang"); kvMap2.put("age", "35"); kvMap2.put("gender", "f"); List<Map<String, Object>> kvList = new ArrayList<Map<String, Object>>(); kvList.add(kvMap); kvList.add(kvMap2); ESClient client = new ESClient(); try { // 创建索引 client.batchIndexDocsForMap("test", "test_user", kvList); Thread.sleep(3000); // 查询结果 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); kvList = client.search("test", "test_user", queryBuilder, null); for (Map<String, Object> kv : kvList) { System.out.println(JSONObject.toJSONString(kv)); } } catch (Exception e) { e.printStackTrace(); } } @Test public void testSearch() { ESClient client = new ESClient(); try { QueryBuilder queryBuilder = QueryBuilders.termQuery("gender", "f"); FilterBuilder filterBuilder = FilterBuilders.rangeFilter("age").from(32); List<Map<String, Object>> kvList = client.search("test", "test_user", queryBuilder, filterBuilder); for (Map<String, Object> kv : kvList) { System.out.println(JSONObject.toJSONString(kv)); } } catch (Exception e) { e.printStackTrace(); } } @Test public void testDeleteDoc() { ESClient client = new ESClient(); try { client.deleteDoc("test", "test_user", "1"); Thread.sleep(3000); // 查询结果 QueryBuilder queryBuilder = QueryBuilders.matchAllQuery(); List<Map<String, Object>> kvList = client.search("test", "test_hw", queryBuilder, null); for (Map<String, Object> kv : kvList) { System.out.println(JSONObject.toJSONString(kv)); } } catch (Exception e) { e.printStackTrace(); } } @Test public void testDeleteDocs() { ESClient client = new ESClient(); try { // 删除文档 QueryBuilder queryBuilder = QueryBuilders.termQuery("ak", "av2"); client.deleteDocsByQuery("test", "test_user", queryBuilder); // 查询结果 queryBuilder = QueryBuilders.matchAllQuery(); List<Map<String, Object>> kvList = client.search("test", "test_user", queryBuilder, null); for (Map<String, Object> kv : kvList) { System.out.println(JSONObject.toJSONString(kv)); } queryBuilder = QueryBuilders.matchAllQuery(); client.deleteDocsByQuery("test", "test_user", queryBuilder); queryBuilder = QueryBuilders.matchAllQuery(); client.deleteDocsByQuery("test", "test_hw", queryBuilder); } catch (Exception e) { e.printStackTrace(); } } }
相关文章推荐
- Android实训案例(三)——实现时间轴效果的ListView,加入本地存储,实现恋爱日记的效果!
- angular.js简单入门。
- mapreduce(二):wordcount详解
- linux打包压缩命令
- 原码, 反码, 补码 详解
- 下拉框回显
- 回调函数的应用
- Android实训案例(二)——Android下的CMD命令之关机重启以及重启recovery
- 复旦大学2015--2016学年第二学期(15级)高等代数II期末考试第七大题解答
- Zabbix Server参数文件详解
- Coursera课程《Machine Learning》学习笔记(week1)
- 指尖下的js ——多触式web前端开发之一:对于Touch的处理
- [从头读历史] 第278节 诗经 曹风
- 随机一个图片
- 在eclipse下如何把jar包加入到jre环境下
- Hadoop 2.7.1 集群搭建 基于CentOS 6.2
- git config命令使用
- 监听手机来电
- win10系统下MySQL 5.7默认数据库位置的问题
- 多线程_多生产者多消费者的实现问题