elasticsearch基本操作之--使用java操作elasticsearch
2017-02-23 13:07
483 查看
/**
* 系统环境: vm12 下的centos 7.2
* 当前安装版本: elasticsearch-2.4.0.tar.gz
*/
es 查询共有4种查询类型
QUERY_AND_FETCH:
主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和es交互一次。
这种查询方式存在数据量和排序问题,主节点会汇总所有分片返回的数据这样数据量会比较大,二是各个分片上的规则可能不一致。
QUERY_THEN_FETCH:
主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和es交互两次。
这种方式解决了数据量问题但是排序问题依然存在而且是es的默认查询方式
DEF_QUERY_AND_FETCH: 和 DFS_QUERY_THEN_FETCH:
将各个分片的规则统一起来进行打分。解决了排序问题但是DFS_QUERY_AND_FETCH仍然存在数据量问题,DFS_QUERY_THEN_FETCH两种噢乖你问题都解决但是效率是最差的。
1, 获取client, 两种方式获取
其他参数的意义:
代码:
tes2代码:
* 系统环境: vm12 下的centos 7.2
* 当前安装版本: elasticsearch-2.4.0.tar.gz
*/
es 查询共有4种查询类型
QUERY_AND_FETCH:
主节点将查询请求分发到所有的分片中,各个分片按照自己的查询规则即词频文档频率进行打分排序,然后将结果返回给主节点,主节点对所有数据进行汇总排序然后再返回给客户端,此种方式只需要和es交互一次。
这种查询方式存在数据量和排序问题,主节点会汇总所有分片返回的数据这样数据量会比较大,二是各个分片上的规则可能不一致。
QUERY_THEN_FETCH:
主节点将请求分发给所有分片,各个分片打分排序后将数据的id和分值返回给主节点,主节点收到后进行汇总排序再根据排序后的id到对应的节点读取对应的数据再返回给客户端,此种方式需要和es交互两次。
这种方式解决了数据量问题但是排序问题依然存在而且是es的默认查询方式
DEF_QUERY_AND_FETCH: 和 DFS_QUERY_THEN_FETCH:
将各个分片的规则统一起来进行打分。解决了排序问题但是DFS_QUERY_AND_FETCH仍然存在数据量问题,DFS_QUERY_THEN_FETCH两种噢乖你问题都解决但是效率是最差的。
1, 获取client, 两种方式获取
@Before public void before() throws Exception { Map<String, String> map = new HashMap<String, String>(); map.put("cluster.name", "elasticsearch_wenbronk"); Settings.Builder settings = Settings.builder().put(map); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); }
@Before public void before11() throws Exception { // 创建客户端, 使用的默认集群名, "elasticSearch" // client = TransportClient.builder().build() // .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), 9300)); // 通过setting对象指定集群配置信息, 配置的集群名 Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch_wenbronk") // 设置集群名 // .put("client.transport.sniff", true) // 开启嗅探 , 开启后会一直连接不上, 原因未知 // .put("network.host", "192.168.50.37") .put("client.transport.ignore_cluster_name", true) // 忽略集群名字验证, 打开后集群名字不对也能连接上 // .put("client.transport.nodes_sampler_interval", 5) //报错, // .put("client.transport.ping_timeout", 5) // 报错, ping等待时间, .build(); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300))); // 默认5s // 多久打开连接, 默认5s System.out.println("success connect"); } PS: 官网给的2种方式都不能用, 需要合起来才能用, 浪费老子一下午...
其他参数的意义:
代码:
package com.wenbronk.javaes;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.util.Date;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.action.bulk.BulkProcessor.Listener;
import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkRequestBuilder;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.get.MultiGetItemResponse;
import org.elasticsearch.action.get.MultiGetResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.update.UpdateRequest;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.node.DiscoveryNode;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.script.Script;
import org.junit.Before;
import org.junit.Test;
import com.alibaba.fastjson.JSONObject;
/**
* 使用java API操作elasticSearch
*
* @author 231
*
*/
public class JavaESTest {
private TransportClient client;
private IndexRequest source;
/**
* 获取连接, 第一种方式
* @throws Exception
*/
// @Before public void before() throws Exception { Map<String, String> map = new HashMap<String, String>(); map.put("cluster.name", "elasticsearch_wenbronk"); Settings.Builder settings = Settings.builder().put(map); client = TransportClient.builder().settings(settings).build() .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("www.wenbronk.com"), Integer.parseInt("9300"))); }
/**
* 查看集群信息
*/
@Test
public void testInfo() {
List<DiscoveryNode> nodes = client.connectedNodes();
for (DiscoveryNode node : nodes) {
System.out.println(node.getHostAddress());
}
}
/**
* 组织json串, 方式1,直接拼接
*/
public String createJson1() {
String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
return json;
}
/**
* 使用map创建json
*/
public Map<String, Object> createJson2() {
Map<String,Object> json = new HashMap<String, Object>();
json.put("user", "kimchy");
json.put("postDate", new Date());
json.put("message", "trying out elasticsearch");
return json;
}
/**
* 使用fastjson创建
*/
public JSONObject createJson3() {
JSONObject json = new JSONObject();
json.put("user", "kimchy");
json.put("postDate", new Date());
json.put("message", "trying out elasticsearch");
return json;
}
/**
* 使用es的帮助类
*/
public XContentBuilder createJson4() throws Exception {
// 创建json对象, 其中一个创建json的方式
XContentBuilder source = XContentFactory.jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying to out ElasticSearch")
.endObject();
return source;
}
/**
* 存入索引中
* @throws Exception
*/
@Test
public void test1() throws Exception {
XContentBuilder source = createJson4();
// 存json入索引中
IndexResponse response = client.prepareIndex("twitter", "tweet", "1").setSource(source).get();
// // 结果获取
String index = response.getIndex();
String type = response.getType();
String id = response.getId();
long version = response.getVersion();
boolean created = response.isCreated();
System.out.println(index + " : " + type + ": " + id + ": " + version + ": " + created);
}
/**
* get API 获取指定文档信息
*/
@Test
public void testGet() {
// GetResponse response = client.prepareGet("twitter", "tweet", "1")
// .get();
GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false) // 线程安全
.get();
System.out.println(response.getSourceAsString());
}
/**
* 测试 delete api
*/
@Test
public void testDelete() {
DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
.get();
String index = response.getIndex();
String type = response.getType();
String id = response.getId();
long version = response.getVersion();
System.out.println(index + " : " + type + ": " + id + ": " + version);
}
/**
* 测试更新 update API
* 使用 updateRequest 对象
* @throws Exception
*/
@Test
public void testUpdate() throws Exception {
UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("twitter");
updateRequest.type("tweet");
updateRequest.id("1");
updateRequest.doc(XContentFactory.jsonBuilder()
.startObject()
// 对没有的字段添加, 对已有的字段替换
.field("gender", "male")
.field("message", "hello")
.endObject());
UpdateResponse response = client.update(updateRequest).get();
// 打印
String index = response.getIndex();
String type = response.getType();
String id = response.getId();
long version = response.getVersion();
System.out.println(index + " : " + type + ": " + id + ": " + version);
}
/**
* 测试update api, 使用client
* @throws Exception
*/
@Test
public void testUpdate2() throws Exception {
// 使用Script对象进行更新
// UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")
// .setScript(new Script("hits._source.gender = \"male\""))
// .get();
// 使用XContFactory.jsonBuilder() 进行更新
// UpdateResponse response = client.prepareUpdate("twitter", "tweet", "1")
// .setDoc(XContentFactory.jsonBuilder()
// .startObject()
// .field("gender", "malelelele")
// .endObject()).get();
// 使用updateRequest对象及script
// UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
// .script(new Script("ctx._source.gender=\"male\""));
// UpdateResponse response = client.update(updateRequest).get();
// 使用updateRequest对象及documents进行更新
UpdateResponse response = client.update(new UpdateRequest("twitter", "tweet", "1")
.doc(XContentFactory.jsonBuilder()
.startObject()
.field("gender", "male")
.endObject()
)).get();
System.out.println(response.getIndex());
}
/**
* 测试update
* 使用updateRequest
* @throws Exception
* @throws InterruptedException
*/
@Test
public void testUpdate3() throws InterruptedException, Exception {
UpdateRequest updateRequest = new UpdateRequest("twitter", "tweet", "1")
.script(new Script("ctx._source.gender=\"male\""));
UpdateResponse response = client.update(updateRequest).get();
}
/**
* 测试upsert方法
* @throws Exception
*
*/
@Test
public void testUpsert() throws Exception {
// 设置查询条件, 查找不到则添加生效
IndexRequest indexRequest = new IndexRequest("twitter", "tweet", "2")
.source(XContentFactory.jsonBuilder()
.startObject()
.field("name", "214")
.field("gender", "gfrerq")
.endObject());
// 设置更新, 查找到更新下面的设置
UpdateRequest upsert = new UpdateRequest("twitter", "tweet", "2")
.doc(XContentFactory.jsonBuilder()
.startObject()
.field("user", "wenbronk")
.endObject())
.upsert(indexRequest);
client.update(upsert).get();
}
/**
* 测试multi get api
* 从不同的index, type, 和id中获取
*/
@Test
public void testMultiGet() {
MultiGetResponse multiGetResponse = client.prepareMultiGet()
.add("twitter", "tweet", "1")
.add("twitter", "tweet", "2", "3", "4")
.add("anothoer", "type", "foo")
.get();
for (MultiGetItemResponse itemResponse : multiGetResponse) {
GetResponse response = itemResponse.getResponse();
if (response.isExists()) {
String sourceAsString = response.getSourceAsString();
System.out.println(sourceAsString);
}
}
}
/**
* bulk 批量执行
* 一次查询可以update 或 delete多个document
*/
@Test
public void testBulk() throws Exception {
BulkRequestBuilder bulkRequest = client.prepareBulk();
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()));
bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(XContentFactory.jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()));
BulkResponse response = bulkRequest.get();
System.out.println(response.getHeaders());
}
/**
* 使用bulk processor
* @throws Exception
*/
@Test
public void testBulkProcessor() throws Exception {
// 创建BulkPorcessor对象
BulkProcessor bulkProcessor = BulkProcessor.builder(client, new Listener() {
public void beforeBulk(long paramLong, BulkRequest paramBulkRequest) {
// TODO Auto-generated method stub
}
// 执行出错时执行
public void afterBulk(long paramLong, BulkRequest paramBulkRequest, Throwable paramThrowable) {
// TODO Auto-generated method stub
}
public void afterBulk(long paramLong, BulkRequest paramBulkRequest, BulkResponse paramBulkResponse) {
// TODO Auto-generated method stub
}
})
// 1w次请求执行一次bulk
.setBulkActions(10000)
// 1gb的数据刷新一次bulk
.setBulkSize(new ByteSizeValue(1, ByteSizeUnit.GB))
// 固定5s必须刷新一次
.setFlushInterval(TimeValue.timeValueSeconds(5))
// 并发请求数量, 0不并发, 1并发允许执行
.setConcurrentRequests(1)
// 设置退避, 100ms后执行, 最大请求3次
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))
.build();
// 添加单次请求
bulkProcessor.add(new IndexRequest("twitter", "tweet", "1"));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));
// 关闭
bulkProcessor.awaitClose(10, TimeUnit.MINUTES);
// 或者
bulkProcessor.close();
}
}
tes2代码:
package com.wenbronk.javaes; import java.net.InetSocketAddress; import org.apache.lucene.queryparser.xml.FilterBuilderFactory; import org.elasticsearch.action.search.MultiSearchResponse; import org.elasticsearch.action.search.SearchRequestBuilder; import org.elasticsearch.action.search.SearchResponse; import org.elasticsearch.action.search.SearchType; import org.elasticsearch.client.transport.TransportClient; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.settings.Settings.Builder; import org.elasticsearch.common.transport.InetSocketTransportAddress; import org.elasticsearch.common.unit.TimeValue; import org.elasticsearch.index.query.QueryBuilder; import org.elasticsearch.index.query.QueryBuilders; import org.elasticsearch.search.SearchHit; import org.elasticsearch.search.aggregations.Aggregation; import org.elasticsearch.search.aggregations.AggregationBuilders; import org.elasticsearch.search.aggregations.bucket.histogram.DateHistogramInterval; import org.elasticsearch.search.sort.SortOrder; import org.elasticsearch.search.sort.SortParseElement; import org.junit.Before; import org.junit.Test; /** * 使用java API操作elasticSearch * search API * @author 231 * */ public class JavaESTest2 { private TransportClient client; /** * 获取client对象 */ @Before public void testBefore() { Builder builder = Settings.settingsBuilder(); builder.put("cluster.name", "wenbronk_escluster"); // .put("client.transport.ignore_cluster_name", true); Settings settings = builder.build(); org.elasticsearch.client.transport.TransportClient.Builder transportBuild = TransportClient.builder(); TransportClient client1 = transportBuild.settings(settings).build(); client = client1.addTransportAddress((new InetSocketTransportAddress(new InetSocketAddress("192.168.50.37", 9300)))); System.out.println("success connect to escluster"); } /** * 测试查询 */ @Test public void testSearch() { // SearchRequestBuilder searchRequestBuilder = client.prepareSearch("twitter", "tweet", "1"); // SearchResponse response = searchRequestBuilder.setTypes("type1", "type2") // .setSearchType(SearchType.DFS_QUERY_THEN_FETCH) // .setQuery(QueryBuilders.termQuery("user", "test")) // .setPostFilter(QueryBuilders.rangeQuery("age").from(0).to(1)) // .setFrom(0).setSize(2).setExplain(true) // .execute().actionGet(); SearchResponse response = client.prepareSearch() .execute().actionGet(); // SearchHits hits = response.getHits(); // for (SearchHit searchHit : hits) { // for(Iterator<SearchHitField> iterator = searchHit.iterator(); iterator.hasNext(); ) { // SearchHitField next = iterator.next(); // System.out.println(next.getValues()); // } // } System.out.println(response); } /** * 测试scroll api * 对大量数据的处理更有效 */ @Test public void testScrolls() { QueryBuilder queryBuilder = QueryBuilders.termQuery("twitter", "tweet"); SearchResponse response = client.prepareSearch("twitter") .addSort(SortParseElement.DOC_FIELD_NAME, SortOrder.ASC) .setScroll(new TimeValue(60000)) .setQuery(queryBuilder) .setSize(100).execute().actionGet(); while(true) { for (SearchHit hit : response.getHits().getHits()) { System.out.println("i am coming"); } SearchResponse response2 = client.prepareSearchScroll(response.getScrollId()) .setScroll(new TimeValue(60000)).execute().actionGet(); if (response2.getHits().getHits().length == 0) { System.out.println("oh no====="); break; } } } /** * 测试multiSearch */ @Test public void testMultiSearch() { QueryBuilder qb1 = QueryBuilders.queryStringQuery("elasticsearch"); SearchRequestBuilder requestBuilder1 = client.prepareSearch().setQuery(qb1).setSize(1); QueryBuilder qb2 = QueryBuilders.matchQuery("user", "kimchy"); SearchRequestBuilder requestBuilder2 = client.prepareSearch().setQuery(qb2).setSize(1); MultiSearchResponse multiResponse = client.prepareMultiSearch().add(requestBuilder1).add(requestBuilder2) .execute().actionGet(); long nbHits = 0; for (MultiSearchResponse.Item item : multiResponse.getResponses()) { SearchResponse response = item.getResponse(); nbHits = response.getHits().getTotalHits(); SearchHit[] hits = response.getHits().getHits(); System.out.println(nbHits); } } /** * 测试聚合查询 */ @Test public void testAggregation() { SearchResponse response = client.prepareSearch() .setQuery(QueryBuilders.matchAllQuery()) // 先使用query过滤掉一部分 .addAggregation(AggregationBuilders.terms("term").field("user")) .addAggregation(AggregationBuilders.dateHistogram("agg2").field("birth") .interval(DateHistogramInterval.YEAR)) .execute().actionGet(); Aggregation aggregation2 = response.getAggregations().get("term"); Aggregation aggregation = response.getAggregations().get("agg2"); // SearchResponse response2 = client.search(new SearchRequest().searchType(SearchType.QUERY_AND_FETCH)).actionGet(); } /** * 测试terminate */ @Test public void testTerminateAfter() { SearchResponse response = client.prepareSearch("twitter").setTerminateAfter(1000).get(); if (response.isTerminatedEarly()) { System.out.println("ternimate"); } } /** * 过滤查询: 大于gt, 小于lt, 小于等于lte, 大于等于gte */ @Test public void testFilter() { SearchResponse response = client.prepareSearch("twitter") .setTypes("") .setQuery(QueryBuilders.matchAllQuery()) //查询所有 .setSearchType(SearchType.QUERY_THEN_FETCH) // .setPostFilter(FilterBuilders.rangeFilter("age").from(0).to(19) // .includeLower(true).includeUpper(true)) // .setPostFilter(FilterBuilderFactory .rangeFilter("age").gte(18).lte(22)) .setExplain(true) //explain为true表示根据数据相关度排序,和关键字匹配最高的排在前面 .get(); } /** * 分组查询 */ @Test public void testGroupBy() { client.prepareSearch("twitter").setTypes("tweet") .setQuery(QueryBuilders.matchAllQuery()) .setSearchType(SearchType.QUERY_THEN_FETCH) .addAggregation(AggregationBuilders.terms("user") .field("user").size(0) // 根据user进行分组 // size(0) 也是10 ).get(); } }
相关文章推荐
- elasticsearch基本操作之--使用java操作elasticsearch
- elasticsearch基本操作之--使用java操作elasticsearch
- Java使用基本JDK操作ZIP文件以及zip文件的加密、解密等功能
- 使用Java操作ElasticSearch1.7程序
- elasticsearch基本操作之--使用QueryBuilders进行查询
- java 关于使用java1.8的localDateTime日期操作的基本使用以及一些使用技巧 持续更新...
- JAVA IDE IntelliJ IDEA使用简介(二)—之基本操作
- java中简述使用JDBC完成数据库操作的基本步骤。
- 使用java客户端操作elasticsearch
- ElasticSearch之Java的基本操作一
- elasticsearch基本操作之--使用QueryBuilders进行查询
- 最好的JAVA IDE IntelliJ IDEA使用简介(二)之基本操作
- 基于Elasticsearch2.1.1的JavaAPI基本操作代码示例
- Java使用基本JDK操作ZIP文件
- JAVA IDE IntelliJ IDEA使用简介(二)—之基本操作
- Java 使用HashMap基本操作
- 使用JAVA的开源API-JExcelAPI来操作Excel,实现基本的功能
- Java使用MySQL数据库基本操作
- 使用JAVA控制AD域进行基本的操作详解
- ElasticSearch的Java Api基本操作入门指南