Java实现对ES数据的新增,删除,修改,及合并
2022-01-14 15:01
260 查看
Java实现对ES数据的新增,删除,修改,及合并
新增数据
代码:
@Autowired private RestHighLevelClient client; /** * @description ES写入数据 * @author zae * @date 2022/1/13 14:40 * @param index 索引库 * @param dataList 数据集合(size为插入数据的条数) */ public void insertEsData(String index,List<Map<String,Object>> dataList) { BulkProcessor bulkProcessor = null; try { bulkProcessor = getBulkProcessor(client); for(Map<String,Object> dataMap:dataList){ bulkProcessor.add(new IndexRequest(index).source(dataMap)); } // 将数据刷新到ES中 bulkProcessor.flush(); } catch (Exception e) { e.printStackTrace(); } finally { try { boolean terminatedFlag = bulkProcessor.awaitClose(150L, TimeUnit.SECONDS); } catch (InterruptedException e) { e.printStackTrace(); } } } private BulkProcessor getBulkProcessor(RestHighLevelClient client) { BulkProcessor bulkProcessor = null; try { BulkProcessor.Listener listener = new BulkProcessor.Listener() { @Override public void beforeBulk(long executionId, BulkRequest request) { logger.info("Try to insert data number : " + request.numberOfActions()); } @Override public void afterBulk(long executionId, BulkRequest request, BulkResponse response) { logger.info("************** Success insert data number : " + request.numberOfActions() + " , id: " + executionId); } @Override public void afterBulk(long executionId, BulkRequest request, Throwable failure) { logger.error("Bulk is unsuccess : " + failure + ", executionId: " + executionId); } }; BiConsumer<BulkRequest, ActionListener<BulkResponse>> bulkConsumer = (request, bulkListener) -> client .bulkAsync(request, RequestOptions.DEFAULT, bulkListener); BulkProcessor.Builder builder = BulkProcessor.builder(bulkConsumer, listener); builder.setBulkActions(5000);//刷新条数 builder.setBulkSize(new ByteSizeValue(100L, ByteSizeUnit.MB));// 刷新大小 builder.setConcurrentRequests(10);//并发线程数 builder.setFlushInterval(TimeValue.timeValueSeconds(100L));// 时间频率 builder.setBackoffPolicy(BackoffPolicy.constantBackoff(TimeValue.timeValueSeconds(1L), 3));//重试补偿策略 bulkProcessor = builder.build(); } catch (Exception e) { e.printStackTrace(); try { bulkProcessor.awaitClose(100L, TimeUnit.SECONDS); } catch (Exception e1) { logger.error(e1.getMessage()); } } return bulkProcessor; }
注意点:
传入数据中的List中的每个Map为一条数据,Map中的key为字段Field,value为值。
假如在插入数据前index库中没有定义Field及映射关系,在插入数据时会自动新增字段Field,字段的类型会默认映射为value值的类型。
假如新插入的数据和之前第一次插入的数据类型不一致,(相同的key但是value的类型不一样),这样在执行代码时可能不会报错失败,但是实际上并没有插入数据成功。
删除数据
代码
/** * @description ES数据删除 * @author zae * @date 2022/1/13 17:14 * @param index 索引库 * @param id 数据id */ public void delete(String index,String id){ DeleteRequest deleteRequest = new DeleteRequest(index,id); DeleteResponse response = null; try { response = client.delete(deleteRequest,RequestOptions.DEFAULT); } catch (IOException e) { e.printStackTrace(); } System.out.println(response); }
删除的话直接指定索引库和要删除的数据的id进行删除,至于数据的id怎么获取,参考下面更新数据中的代码。
更新数据
直接修改
/** * @description ES数据更新 * @author zae * @date 2022/1/13 16:10 * @param index 索引库 * @param key 字段名 * @param value 更新后的值 * @param id 需要修改的那条数据的id */ public void update(String index,String key,Object value,String id){ try { UpdateRequest request = new UpdateRequest(); request.index(index) //索引名 .id(id)//id .doc( XContentFactory.jsonBuilder() .startObject() .field(key, value)//要修改的字段 及字段值 .endObject() ); UpdateResponse response= client.update(request,RequestOptions.DEFAULT); System.out.println(response.status()); } catch (Exception e) { e.printStackTrace(); } }
更新数据需要传入指定的索引库,需要修改的字段名称,修改后的value值,以及代表那条数据的唯一id
关于唯一id怎么获得,可以参考以下代码。
public void selectAndUpdate(){ int count = 0; //step1:根据条件获取需要修改的数据的id SearchHit[] searchHits = esDeal.selectIdByKey(PRODUCT_DEV_INDEX, "name"); // step2:遍历更新查询出来的数据 for (SearchHit searchHit : searchHits) { // 获取到单条记录的id String id = searchHit.getId(); //调用更新数据的核心方法 esDeal.update(PRODUCT_DEV_INDEX,"name","张三",id); count++; } System.out.println("一共更新了"+count+"条数据"); } public SearchHit[] selectIdByKey(String index,String key){ BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery(); boolQueryBuilder.must(QueryBuilders.existsQuery(key)); SearchSourceBuilder sourceBuilder = new SearchSourceBuilder(); sourceBuilder.query(boolQueryBuilder) .trackTotalHits(true) .size(22); SearchRequest searchRequest = new SearchRequest() .indices(index) .source(sourceBuilder); try { SearchResponse searchResponse = client.search(searchRequest, RequestOptions.DEFAULT); SearchHit[] hits = searchResponse.getHits().getHits(); return hits; } catch (IOException e) { e.printStackTrace(); } return null; }
数据合并
数据合并其实也是更新的一种,关键在于怎么提取及封装需要更新的数据,根据业务场景的不同达到合并的效果,以上面的图片为例,我需要将以上数据的20004.value里面的值合并上新的数据却也保留着原有的数据。
@Test public void selectAndUpdate(){ int count = 0; //step1:根据条件获取需要修改的数据的id SearchHit[] searchHits = esDeal.selectIdByKey(PRODUCT_DEV_INDEX, "20004.value"); for (SearchHit searchHit : searchHits) { String id = searchHit.getId(); // step2:组装数据 Map<String, Object> sourceAsMap = searchHit.getSourceAsMap(); List<Map<String,Object>> mapList = (List<Map<String, Object>>) sourceAsMap.get("20004"); Map<String, Object> mapData = mapList.get(0); // 获取之前20004.value的数据,并添加上新的数据 List value = (List) mapData.get("value"); value.add("120021"); value.add("3990993"); mapData.put("value",value); // step3:根据id更新数据 esDeal.update(PRODUCT_DEV_INDEX,"20004",mapList,id); count++; } System.out.println("一共更新了"+count+"条数据"); }
备注:
- 代码中调用的selectIdByKey()以及update()在更新数据的第一部分都有代码,直接使用就好。根据以上代码更新后的数据为:
- 在更新20004.value的值时,不需要保持和20004.value的原有数据类型一致,更新成其他类型也是可以的,这点和插入key:value的数据是有区别的。
相关文章推荐
- java使用dbcp连接池实现jdbc动态新增,修改,删除,切换数据库源
- JAVA实现单链表的新增 插入 删除 修改
- java中想实现一个类存储数据,存储时可以像数组一样有数据的顺序,可以有下标方便获取数据 还可以无限存储,可修改可删除。————————泛型
- Zookeeper客户端基本操作java实现——创建连接、创建节点、添加修改节点内容、获取子节点、获取节点数据、删除节点
- 用java实现对MySql数据库中数据的读取、插入、修改和删除
- 用Java实现数据的显示,增加,删除,修改
- 利用Access实现Excel中多表合并,并删除重复数据的问题
- JAVA_WEB项目(结合Servlet+jsp+ckEditor编辑器+jquery easyui技术)实现新闻发布管理系统第三篇:新闻发布,新闻修改,新闻删除功能的实现
- 用Nhibernate怎么实现数据的添加、删除、修改简单程序
- 稳扎稳打Silverlight(58) - 4.0通信之WCF RIA Services: 通过 Domain Service, 以 MVVM 模式实现数据的添加、删除、修改和查询
- <java><JTable>使用AbstractTableModel实现更新、删除、插入数据
- 【数据结构】顺序线性表的插入、删除、合并实现
- 比较两个DataTable数据(结构相同),返回新增的,删除的,修改前的,修改后的 DataTable
- 用Nhibernate怎么实现数据的添加、删除、修改简单程序
- java实现hbase表创建、数据插入、删除表
- java实现hbase表创建、数据插入、删除表
- java jsp js 实现地区表 三级联动,并修改时数据回显
- Java插入修改删除数据库数据的基本方法
- 基于JAVA的图书数据库管理-具有新增,修改,删除,查询功能
- 用java实现的双向链表增加删除修改操作