您的位置:首页 > 编程语言 > Java开发

ElasticSearch 高级 REST 客户端 (待续......)

2018-07-15 11:38 507 查看

ElasticSearch 高级 REST 客户端

起步

阅读文档须知,文档基于
Elasticsearch 6.x
,阅读要求,熟悉
ElasticSearch
的语法

兼容性

高级客户端要求最低的
java
版本是1.8 ,它依赖
Elasticsearch
的核心工程,客户端的版本应该和
Elasticsearch
的版本保持一致,高级客户端和
TransportClient
【TCP 连接客户端】 接受一样的请求参数,并且返回一样的响应结果,如果你想从
TransportClient
客户端迁移到
REST
客户端,请参考迁移手册

高级客户端保证能够与运行在相同主要版本和更高版本上的
Elasticsearch
节点进行通信。它不需要与通信的
Elasticsearch
节点处于相同的版本,因为它是向前兼容的,这意味着它支持与更高版本的
Elasticsearch
进行通信,而不是与其开发的版本进行通信。

6.0 客户端能够与任何6.x版本的
Elasticsearch
节点通信,而6.1客户端肯定能够与6.1,6.2和任何更高版本的6.x版本通信,但在与老版的
Elasticsearch
节点通信时可能存在不兼容问题版本,例如6.1和6.0,6.1客户端为一些
api
添加了新的请求体字段支持,然而6.0节点却不支持。

建议在将
Elasticsearch
集群升级到新的主版本时升级高级客户端,因为
REST API
中断更改可能会导致意外结果,具体取决于请求所针对的节点,并且新添加的API仅支持新版本的客户端。一旦集群中的所有节点都升级到新的主版本,客户端应保持同步更新。

Java api 文档

文档地址
<https://artifacts.elastic.co/javadoc/org/elasticsearch/client/elasticsearch-rest-high-level-client/6.3.1/index.html>


maven 仓库

高级Java REST客户端托管在 Maven Central上。所需的最低Java版本是
1.8


高级REST客户端与Elasticsearch具有相同的发布周期。将版本替换为所需的客户端版本。

如果您正在寻找SNAPSHOT版本,可以通过https://snapshots.elastic.co/maven/获取Elastic Maven Snapshot存储库。

Maven 配置

<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>6.3.1</version>
</dependency>

Gradel 配置

dependencies {
compile 'org.elasticsearch.client:elasticsearch-rest-high-level-client:6.3.1'
}

依赖

高级客户端依赖下面的组件及其传递依赖性:

org.elasticsearch.client:elasticsearch-rest-client

org.elasticsearch:elasticsearch

初始化

一个
RestHighLevelClient
实例需要一个低级客户端的Builder 来构建如下:

RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(
new HttpHost("localhost", 9200, "http"),
new HttpHost("localhost", 9201, "http")));

高级客户端将在内部创建用于执行请求的低级客户端,低级客户端基于框架提供的
builder
,并管理其生命周期。

高级客户端实例应该在不再需要时关闭,以便正确释放它使用的所有资源,以及底层的http客户端实例及其线程。这可以通过
close
方法完成,该方法将关闭内部
RestClient
实例。

client.close();

Document API

Java高级REST客户端支持以下文档API:

单文档API:

index api - 索引API

get api - 获取API

delete api - 删除API

update api - 更新API

多文档API

bulk api - 批量操作 api

Multi-Get API - 批量获取 api

Index API

Index 请求体

一个引索请求需要下面的参数:

IndexRequest request = new IndexRequest(
"posts", //index 名
"doc",  //type 名
"1");   //文档 ID
String jsonString = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
request.source(jsonString, XContentType.JSON);//设置 string 类型的文档source

构建文档 source 的方式

除了
String
上面显示的示例之外,还可以以不同方式提供文档源 :

方式一:以
Map
的方式提供的文档源,
Map
自动转换为
JSON
格式

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("user", "kimchy");
jsonMap.put("postDate", new Date());
jsonMap.put("message", "trying out Elasticsearch");
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
.source(jsonMap);

方式二:以
XContentBuilder
对象方式提供,
Elasticsearch
内置了
helper
生成
JSON
内容

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.field("user", "kimchy");
builder.timeField("postDate", new Date());
builder.field("message", "trying out Elasticsearch");
}
builder.endObject();
IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
.source(builder);

方式三:以键值对方式提供,转换为
JSON
格式

IndexRequest indexRequest = new IndexRequest("posts", "doc", "1")
.source("user", "kimchy",
"postDate", new Date(),
"message", "trying out Elasticsearch");

可选参数

可以选择以下参数:

设置路由

request.routing("routing");


设置父文档

request.parent("parent");


设置超时时间

request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");


设置刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");


设置版本

request.version(2);


设置版本类型

request.versionType(VersionType.EXTERNAL);


设置文档操作类型

request.opType(DocWriteRequest.OpType.CREATE);
request.opType("create");


文档执行之前,设置
pipeline


request.setPipeline("pipeline");

同步执行方式

IndexResponse indexResponse = client.index(request);

异步执行方式

索引请求的异步执行需要将
IndexRequest
 实例和
ActionListener
实例都传递给异步方法:

ActionListener<IndexResponse> listener = new ActionListener<IndexResponse>() {
@Override
public void onResponse(IndexResponse indexResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
IndexResponse indexResponse = client.index(request);

异步方法不会阻塞并立即返回。一旦完成,如果执行成功,则使用该方法
ActionListener
回调
onResponse
,如果失败则回调
onFailure
方法。

引索响应结果

返回的
IndexResponse
包含了有关已执行操作的信息,如下所示:

String index = indexResponse.getIndex();
String type = indexResponse.getType();
String id = indexResponse.getId();
long version = indexResponse.getVersion();
if (indexResponse.getResult() == DocWriteResponse.Result.CREATED) {
//创建文档操作
} else if (indexResponse.getResult() == DocWriteResponse.Result.UPDATED) {
//更新文档操作
}
ReplicationResponse.ShardInfo shardInfo = indexResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {
//处理成功分片数小于总分片数的情况
}
if (shardInfo.getFailed() > 0) {//理潜在的失败情况
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
}
}

如果存在文档版本冲突,则会抛出
ElasticsearchException


IndexRequest request = new IndexRequest("posts", "doc", "1")
.source("field", "value")
.version(1);
try {
IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
//引发的异常表示返回了版本冲突错误
}
}

如果已存在具有相同索引,类型和ID的文档,
opType
设置为
create
也会发生冲突:

IndexRequest request = new IndexRequest("posts", "doc", "1")
.source("field", "value")
.opType(DocWriteRequest.OpType.CREATE);
try {
IndexResponse response = client.index(request);
} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {

}
}

Get API

Get 请求体

构建 GetRequest 的参数如下:

GetRequest getRequest = new GetRequest("posts","doc","1");

可选参数

设置返回响应不包含任何字段,默认情况下返回响应包含该所有字段

request.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE);


配置返回响应包含哪些字段

String[] includes = new String[]{"message", "*Date"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);


设置返回响应不包含哪些字段

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[]{"message"};
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.fetchSourceContext(fetchSourceContext);


设置检索哪些存储字段

request.storedFields("message"); //为特定存储字段配置检索 (要求在映射中单独存储字段)
GetResponse getResponse = client.get(request);
String message = getResponse.getField("message").getValue();//获取message存储的值 (要求将字段单独存储在映射中)


设置路由

request.routing("routing");


设置父文档

request.parent("parent");


设置偏好

request.preference("preference");


设置实时标识,默认
true


request.realtime(false);


设置每次获取文档之前是否执行刷新操作,默认
false


request.refresh(true);


设置版本号

request.version(2);


设置版本类型

request.versionType(VersionType.EXTERNAL);

同步执行

GetResponse getResponse = client.get(getRequest);

异步执行

get
请求的异步执行需要将GetRequest 实例和ActionListener实例都传递给异步方法

ActionListener<GetResponse> listener = new ActionListener<GetResponse>() {
@Override
public void onResponse(GetResponse getResponse) {
}

@Override
public void onFailure(Exception e) {
}
};
GetResponse getResponse = client.get(getRequest);

异步方法不会阻塞并立即返回。一旦完成,如果执行成功,则使用该方法
ActionListener
回调
onResponse
,如果失败则回调
onFailure
方法。

响应结果

返回的
IndexResponse
包含了有关已执行操作的信息,如下所示:

String index = getResponse.getIndex();
String type = getResponse.getType();
String id = getResponse.getId();
if (getResponse.isExists()) {
long version = getResponse.getVersion();
String sourceAsString = getResponse.getSourceAsString();
Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
byte[] sourceAsBytes = getResponse.getSourceAsBytes();
} else {
//处理未找到文档的方案。注意,虽然返回的响应具有404状态代码,但他会返回一个有效GetResponse而不是抛出异常。此类响应不包含任何文档字段,而且isExists方法返回false。
}

当对不存在的索引(
index
)执行get请求时,响应会有
404
状态代码,但是会抛出
ElasticsearchException
,需要按如下方式处理:

GetRequest request = new GetRequest("does_not_exist", "doc", "1");
try {
GetResponse getResponse = client.get(request);
} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {

}
}

如果请求特定版本的文档,并且现有文档具有不同的版本号,则会引发版本冲突:

try {
GetRequest request = new GetRequest("posts", "doc", "1").version(2);
GetResponse getResponse = client.get(request);
} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
//处理版本冲突
}
}

Exists API

如果文档存在就返回
true
,否则返回
false


Exists Request

它的
GetRequest
就像Get API一样。支持所有可选参数 。由于
exists()
只返回
true
false
,所有建议关闭返回
_source
和任何存储的字段,以便请求更加轻量:

GetRequest getRequest = new GetRequest(
"posts",
"doc",
"1");
getRequest.fetchSourceContext(new FetchSourceContext(false)); //不返回_source
getRequest.storedFields("_none_");  //不返回存储字段

同步执行

boolean exists = client.exists(getRequest);

异步执行

ActionListener<Boolean> listener = new ActionListener<Boolean>() {
@Override
public void onResponse(Boolean exists) {
}
@Override
public void onFailure(Exception e) {
}
};
client.existsAsync(getRequest, listener);

Delete API

Delete Request

DeleteRequest
参数如下:

DeleteRequest request = new DeleteRequest("posts","doc","1");

可选参数

设置路由

request.routing("routing");


设置父文档

request.parent("parent");


设置超时

request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");


设置刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");


设置版本号

request.version(2);


设置版本类型

request.versionType(VersionType.EXTERNAL);

同步执行

DeleteResponse deleteResponse = client.delete(request);

异步执行

ActionListener<DeleteResponse> listener = new ActionListener<DeleteResponse>() {
@Override
public void onResponse(DeleteResponse deleteResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.deleteAsync(request, listener);

Delete Response

返回的
DeleteResponse
包含了有关已执行操作的信息,如下所示:

String index = deleteResponse.getIndex();
String type = deleteResponse.getType();
String id = deleteResponse.getId();
long version = deleteResponse.getVersion();
ReplicationResponse.ShardInfo shardInfo = deleteResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {

}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
}
}

它还可以检查文档是否存在

DeleteRequest request = new DeleteRequest("posts", "doc", "does_not_exist");
DeleteResponse deleteResponse = client.delete(request);if (deleteResponse.getResult() == DocWriteResponse.Result.NOT_FOUND) {
//如果找不到要删除的文档
}

如果请求的文档版本冲突,会抛
ElasticsearchException
异常

try {
DeleteRequest request = new DeleteRequest("posts", "doc", "1").version(2);
DeleteResponse deleteResponse = client.delete(request);} catch (ElasticsearchException exception) {
if (exception.status() == RestStatus.CONFLICT) {
//引发的异常表示返回了版本冲突错误
}
}

Update API

UpdateRequest

参数如下:

UpdateRequest request = new UpdateRequest("posts", "doc", "1");

Update API
允许使用脚本或传递部分文档来更新现有文档。

使用脚本更新文档

使用内联脚本

Map<String, Object> parameters = singletonMap("count", 4); //使用Map对象作为脚本参数
Script inline = new Script(ScriptType.INLINE, "painless",
"ctx._source.field += params.count", parameters); //使用painless语言和提供的参数创建内联脚本
UpdateRequest request = new UpdateRequest("posts", "doc", "1");request.script(inline); //将脚本设置为更新请求

或者使用存储在es 中的脚本

Script stored =new Script(ScriptType.STORED, null, "increment-field", parameters);//使用存储在 es 中的painless脚本,脚本名为increment-field
request.script(stored);

传递部分文档作为参数来更新文档

当使用部分文档来更新现有的文档时,部分文档将与现有文档合并。

部分文档可以以不同方式提供:

UpdateRequest request = new UpdateRequest("posts", "doc", "1");String jsonString = "{" +
"\"updated\":\"2017-01-01\"," +
"\"reason\":\"daily update\"" +
"}";
request.doc(jsonString, XContentType.JSON);//用json格式的字符串作为部分文档源

Map
提供部分文档源,会被自动转化成
json
格式,如下

Map<String, Object> jsonMap = new HashMap<>();
jsonMap.put("updated", new Date());
jsonMap.put("reason", "daily update");
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc(jsonMap);

XContentBuilder
对象作为部分文档源,
Elasticsearch
内置的 helpers 会自动将它转化为 json 文档

XContentBuilder builder = XContentFactory.jsonBuilder();
builder.startObject();
{
builder.timeField("updated", new Date());
builder.field("reason", "daily update");
}
builder.endObject();
UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc(builder);

使用键值对作为部分文档源,他会被转换成 json 文本

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc("updated", new Date(),
"reason", "daily update");

Upserts

如果文档尚不存在,则可以使用以下
upsert
方法来将它作为新文档插入:

String jsonString = "{\"created\":\"2017-01-01\"}";
request.upsert(jsonString, XContentType.JSON);

和部分文档更新一样,
upsert
方法接受
String
Map
XContentBuilder
 or 键值对作为入参

可选参数

设置路由

request.routing("routing");


设置父文档

request.parent("parent");


设置超时时间

request.timeout(TimeValue.timeValueSeconds(1));
request.timeout("1s");


设置刷新策略

request.setRefreshPolicy(WriteRequest.RefreshPolicy.WAIT_UNTIL);
request.setRefreshPolicy("wait_for");


设置重试更新操作的次数

如果要更新的文档已在更新操作的get和indexing阶段之间的另一个操作更改,则重试

request.retryOnConflict(3);


设置是否获取新文档内容,默认
false


request.fetchSource(true);


指定返回哪些字段

String[] includes = new String[]{"updated", "r*"};//正则匹配
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(new FetchSourceContext(true, includes, excludes));


指定不返回哪些字段

String[] includes = new String[]{"updated", "r*"};
String[] excludes = Strings.EMPTY_ARRAY;
request.fetchSource(new FetchSourceContext(true, includes, excludes));


设置文档版本号

request.version(2);


设置是否启用noop 检测

request.detectNoop(false);


指示脚本必须运行,无论文档是否存在,即如果文档尚不存在,脚本将负责创建文档

request.scriptedUpsert(true);


如果文档不存在,则表明必须将部分文档用作upsert文档

request.docAsUpsert(true);


设置在执行更新操作之前必须处于活动状态的分片副本数

request.waitForActiveShards(2);
request.waitForActiveShards(ActiveShardCount.ALL);

同步执行

UpdateResponse updateResponse = client.update(request);

异步执行

client.updateAsync(request, new ActionListener<UpdateResponse>() {
@Override
public void onResponse(UpdateResponse updateResponse) {
}
@Override
public void onFailure(Exception e) {
}
});

UpdateResponse

返回的
UpdateResponse
包含了有关已执行操作的信息,如下所示:

String index = updateResponse.getIndex();
String type = updateResponse.getType();
String id = updateResponse.getId();
long version = updateResponse.getVersion();
if (updateResponse.getResult() == DocWriteResponse.Result.CREATED) {
//首次创建文档(upsert)
} else if (updateResponse.getResult() == DocWriteResponse.Result.UPDATED) {
//文档更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.DELETED) {
//文档更新
} else if (updateResponse.getResult() == DocWriteResponse.Result.NOOP) {
//文档未受更新影响,即未对文档执行任何操作(noop)
}

UpdateRequest
 通过
fetchSource
方法启用获取文档功能时,响应包含更新文档的来源:

GetResult result = updateResponse.getGetResult();
if (result.isExists()) {
String sourceAsString = result.sourceAsString();
Map<String, Object> sourceAsMap = result.sourceAsMap();
byte[] sourceAsBytes = result.source();
} else {

}

可以检查分片失败:

ReplicationResponse.ShardInfo shardInfo = updateResponse.getShardInfo();
if (shardInfo.getTotal() != shardInfo.getSuccessful()) {

}
if (shardInfo.getFailed() > 0) {
for (ReplicationResponse.ShardInfo.Failure failure : shardInfo.getFailures()) {
String reason = failure.reason();
}
}

当对一个不存在的文档执行
UpdateRequest
时,响应具有
404
状态代码,会抛出
ElasticsearchException
,需要按如下方式处理:

UpdateRequest request = new UpdateRequest("posts", "type", "does_not_exist")
.doc("field", "value");
try {
UpdateResponse updateResponse = client.update(request);} catch (ElasticsearchException e) {
if (e.status() == RestStatus.NOT_FOUND) {

}
}

如果发生文档版本冲突,会抛出异常:

UpdateRequest request = new UpdateRequest("posts", "doc", "1")
.doc("field", "value")
.version(1);
try {
UpdateResponse updateResponse = client.update(request);} catch(ElasticsearchException e) {
if (e.status() == RestStatus.CONFLICT) {
}
}

Bulk API

BulkRequest

BulkRequest
可用于使用单个请求执行多个索引,更新和/或删除操作

它要求至少将一个操作添加到批量请求:

BulkRequest request = new BulkRequest();
request.add(new IndexRequest("posts", "doc", "1")
.source(XContentType.JSON,"field", "foo"));
request.add(new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON,"field", "bar"));
request.add(new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON,"field", "baz"));

并且可以添加不同的操作类型
BulkRequest


BulkRequest request = new BulkRequest();
request.add(new DeleteRequest("posts", "doc", "3"));
request.add(new UpdateRequest("posts", "doc", "2")
.doc(XContentType.JSON,"other", "test"));
request.add(new IndexRequest("posts", "doc", "4")
.source(XContentType.JSON,"field", "baz"));

可选参数

设置超时时间

request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");


设置刷新策略

request.timeout(TimeValue.timeValueMinutes(2));
request.timeout("2m");


设置在索引/更新/删除操作之前必须处于活动状态的分片副本数

request.waitForActiveShards(2);
request.waitForActiveShards(ActiveShardCount.ALL); //可选ActiveShardCount.ALL、 ActiveShardCount.ONE 、 ActiveShardCount.DEFAULT

同步执行

BulkResponse bulkResponse = client.bulk(request);

异步执行

ActionListener<BulkResponse> listener = new ActionListener<BulkResponse>() {
@Override
public void onResponse(BulkResponse bulkResponse) {
}
@Override
public void onFailure(Exception e) {
}
};
client.bulkAsync(request, listener);

BulkResponse

响应结果,允许迭代每个结果,如下所示:

for (BulkItemResponse bulkItemResponse : bulkResponse) {
//可以是IndexResponse、UpdateResponse、DeleteResponse,他们可以全部被视为DocWriteResponse实例
DocWriteResponse itemResponse = bulkItemResponse.getResponse();

if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.INDEX
|| bulkItemResponse.getOpType() == DocWriteRequest.OpType.CREATE) {
IndexResponse indexResponse = (IndexResponse) itemResponse;

} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.UPDATE) {
UpdateResponse updateResponse = (UpdateResponse) itemResponse;

} else if (bulkItemResponse.getOpType() == DocWriteRequest.OpType.DELETE) {
DeleteResponse deleteResponse = (DeleteResponse) itemResponse;
}
}

批量响应提供了一种快速检查一个或多个操作是否失败的方法:

if(bulkResponse.hasFailures()){
//如果至少有一个操作失败,则返回true
}

在这种情况下,有必要迭代所有操作结果,以检查操作是否失败,如果是,则获取相应的失败信息:

for(BulkItemResponse bulkItemResponse:bulkResponse){
if(bulkItemResponse.isFailed()){
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();

}
}

批量处理器

BulkProcessor
提供了一个工具类简化操作,它可以透明地执行添加到
processor
中的
index
/
update
/
delete
操作。

为了执行请求,
BulkProcessor
需要以下组件:

RestHighLevelClient


此客户端用于执行
BulkRequest
和获取
BulkResponse


BulkProcessor.Listener


在每次
BulkRequest
执行之前、之后或
BulkRequest
失败时调用器监听

然后该
BulkProcessor.builder
方法可用于构建新的
BulkProcessor


BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {

}

@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {

}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {

}
};

BulkProcessor bulkProcessor =
BulkProcessor.builder(client::bulkAsync, listener).build();

BulkProcessor.Builder
提供了方法来配置
BulkProcessor
处理请求的行为:

BulkProcessor.Builder builder = BulkProcessor.builder(client::bulkAsync, listener);
builder.setBulkActions(500);//根据当前添加的操作数设置何时刷新新的批量请求(默认为1000,使用-1禁用它)
builder.setBulkSize(new ByteSizeValue(1L, ByteSizeUnit.MB)); //根据当前添加的操作内容大小设置何时刷新新的批量请求(默认为5Mb,使用-1禁用它)
builder.setConcurrentRequests(0); //设置允许执行的并发请求数(默认为1,使用0只允许执行单个请求)
builder.setFlushInterval(TimeValue.timeValueSeconds(10L)); //BulkRequest如果间隔超过,则 设置刷新间隔刷新任何挂起(默认为未设置)
builder.setBackoffPolicy(BackoffPolicy
.constantBackoff(TimeValue.timeValueSeconds(1L), 3));//设置一个最初等待1秒的常量重试策略,最多重试3次。见BackoffPolicy.noBackoff()、BackoffPolicy.constantBackoff()、BackoffPolicy.exponentialBackoff() 提供更多的选择

一旦
BulkProcessor
被创建,请求可以被添加到
processor


IndexRequest one = new IndexRequest("posts", "doc", "1").
source(XContentType.JSON, "title",
"In which order are my Elasticsearch queries executed?");
IndexRequest two = new IndexRequest("posts", "doc", "2")
.source(XContentType.JSON, "title",
"Current status and upcoming changes in Elasticsearch");
IndexRequest three = new IndexRequest("posts", "doc", "3")
.source(XContentType.JSON, "title",
"The Future of Federated Search in Elasticsearch");

bulkProcessor.add(one);
bulkProcessor.add(two);
bulkProcessor.add(three);

BulkProcessor
执行所有的请求,并且为每次的
BulkRequest
回调
BulkProcessor.Listener
,监听器提供了访问
BulkRequest
BulkResponse
的方法

BulkProcessor.Listener listener = new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId, BulkRequest request) {
int numberOfActions = request.numberOfActions();
logger.debug("Executing bulk [{}] with {} requests",
executionId, numberOfActions);
}

@Override
public void afterBulk(long executionId, BulkRequest request,
BulkResponse response) {
if (response.hasFailures()) {
logger.warn("Bulk [{}] executed with failures", executionId);
} else {
logger.debug("Bulk [{}] completed in {} milliseconds",
executionId, response.getTook().getMillis());
}
}

@Override
public void afterBulk(long executionId, BulkRequest request, Throwable failure) {
//执行失败后调用
logger.error("Failed to execute bulk", failure);
}
};

将所有请求添加到
BulkProcessor
后,需要关闭其实例,有两种关闭方式。

awaitClose()
方法可用于等待所有请求都已处理或指定的等待时间:

boolean terminated = bulkProcessor.awaitClose(30L,TimeUnit.SECONDS);//true:如果所有批量请求都已完成,false:在所有批量请求完成之前等待时间已过

close()
方法可用于立即关闭
BulkProcessor


bulkProcessor.close();

两种方法在关闭处理器之前刷新已经添加到处理器的请求,并且禁止添加新请求

Multi-Get API

multiGet API
可以在单个请求中执行多个
get
请求

Multi-Get Request

获取一个
MultiGetRequest
实例,然后添加多个
MultiGetRequest.Item
:

MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item(
"index",
"type",
"example_id"));
request.add(new MultiGetRequest.Item("index", "type", "another_id"));

可选参数

multiGet
get Api
支持相同的可选参数. 你可以在
Item
上设置可选参数:

设置不返回任何文档,默认返回文档

request.add(new MultiGetRequest.Item("index", "type", "example_id")
.fetchSourceContext(FetchSourceContext.DO_NOT_FETCH_SOURCE)
);


设置返回文档的哪些字段

String[] includes = new String[] {"foo", "*r"};
String[] excludes = Strings.EMPTY_ARRAY;
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("index", "type", "example_id")
.fetchSourceContext(fetchSourceContext));


设置不返回文档的哪些字段

String[] includes = Strings.EMPTY_ARRAY;
String[] excludes = new String[] {"foo", "*r"};
FetchSourceContext fetchSourceContext =
new FetchSourceContext(true, includes, excludes);
request.add(new MultiGetRequest.Item("index", "type", "example_id")
.fetchSourceContext(fetchSourceContext));


配置返回指定的存储字段

request.add(new MultiGetRequest.Item("index", "type", "example_id")
.storedFields("foo"));  //设置返回存储字段 foo
MultiGetResponse response = client.multiGet(request);
MultiGetItemResponse item = response.getResponses()[0];
String value = item.getResponse().getField("foo").getValue(); //获取存储字段foo的值


其他可选参数

request.add(new MultiGetRequest.Item("index", "type", "with_routing")
.routing("some_routing"));//设置路由
request.add(new MultiGetRequest.Item("index", "type", "with_parent")
.parent("some_parent"));//设置父文档
request.add(new MultiGetRequest.Item("index", "type", "with_version")
.versionType(VersionType.EXTERNAL)//设置文档版本类型
.version(10123L)); //设置版本号
request.preference("some_preference");  //设置偏好
request.realtime(false); //设置实时标识,默认为 true
request.refresh(true); //获取文档之前执行刷新操作,默认 false

同步执行

MultiGetResponse response = client.multiGet(request);

异步执行

ActionListener<MultiGetResponse> listener = new ActionListener<MultiGetResponse>() {
@Override
public void onResponse(MultiGetResponse response) {

}

@Override
public void onFailure(Exception e) {

}
};
MultiGetResponse response = client.multiGet(request);

MultiGetResponse

返回的
MultiGetResponse
通过
getResponses
方法可以获取一个
MultiGetItemResponse
列表,列表中的响应与请求的顺序相同,如果
get
成功
MultiGetItemResponse
包含一个
GetResponse
,如果它失败了会包含一个
MultiGetResponse.Failure


MultiGetItemResponse firstItem = response.getResponses()[0];
assertNull(firstItem.getFailure());//如果成功,返回 null
GetResponse firstGet = firstItem.getResponse(); //获取 GetResponse
String index = firstItem.getIndex();
String type = firstItem.getType();
String id = firstItem.getId();
if(firstGet.isExists())//判断文档是否存在
long version = firstGet.getVersion();
String sourceAsString = firstGet.getSourceAsString();
Map <String,Object> sourceAsMap = firstGet.getSourceAsMap();
byte [] sourceAsBytes = firstGet.getSourceAsBytes();
} else {

}

如果请求的 index 不存在,则返回响应会包含一个异常信息

assertNull(missingIndexItem.getResponse());
Exception e = missingIndexItem.getFailure().getFailure();
ElasticsearchException ee = (ElasticsearchException) e;
// TODO status is broken! fix in a followup
// assertEquals(RestStatus.NOT_FOUND, ee.status());
assertThat(e.getMessage(),
containsString("reason=no such index"));

请求文档版本冲突,则返回响应会包含一个异常信息

MultiGetRequest request = new MultiGetRequest();
request.add(new MultiGetRequest.Item("index", "type", "example_id")
.version(1000L));
MultiGetResponse response = client.multiGet(request);MultiGetItemResponse item = response.getResponses()[0];
assertNull(item.getResponse());
Exception e = item.getFailure().getFailure();
ElasticsearchException ee = (ElasticsearchException) e;
// TODO status is broken! fix in a followup
// assertEquals(RestStatus.CONFLICT, ee.status());
assertThat(e.getMessage(),
containsString("version conflict, current version [1] is "
+ "different than the one provided [1000]"));

Search API

高级客户端支持下面的
Search API
:

Search API

Search Scroll API

Clear Scroll API

Multi-Search API

Ranking Evaluation API

Search API

SearchRequest

SearchRequest searchRequest = new SearchRequest();
SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.query(QueryBuilders.matchAllQuery());
searchRequest.source(searchSourceBuilder);

可选参数

SearchRequest searchRequest = new SearchRequest("posts");
searchRequest.types("doc");
searchRequest.routing("routing");
searchRequest.indicesOptions(IndicesOptions.lenientExpandOpen());
searchRequest.preference("_local");

使用 SearchBuilder

SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
sourceBuilder.query(QueryBuilders.termQuery("user", "kimchy"));
sourceBuilder.from(0);
sourceBuilder.size(5);
sourceBuilder.timeout(new TimeValue(60, TimeUnit.SECONDS));

构建查询语句

MatchQueryBuilder matchQueryBuilder = new MatchQueryBuilder("user", "kimchy");
matchQueryBuilder.fuzziness(Fuzziness.AUTO);
matchQueryBuilder.prefixLength(3);
matchQueryBuilder.maxExpansions(10);
matchQueryBuilder.fuzziness(Fuzziness.AUTO);
matchQueryBuilder.prefixLength(3);
matchQueryBuilder.maxExpansions(10);
searchSourceBuilder.query(matchQueryBuilder);

设置排序

sourceBuilder.sort(new ScoreSortBuilder().order(SortOrder.DESC));
sourceBuilder.sort(new FieldSortBuilder("_uid").order(SortOrder.ASC));

文档过滤

sourceBuilder.fetchSource(false);
String[] includeFields = new String[] {"title", "user", "innerObject.*"};
String[] excludeFields = new String[] {"_type"};
sourceBuilder.fetchSource(includeFields, excludeFields);

字段高亮

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
HighlightBuilder highlightBuilder = new HighlightBuilder();
HighlightBuilder.Field highlightTitle =
new HighlightBuilder.Field("title");
highlightTitle.highlighterType("unified");
highlightBuilder.field(highlightTitle);
HighlightBuilder.Field highlightUser = new HighlightBuilder.Field("user");
highlightBuilder.field(highlightUser);
searchSourceBuilder.highlighter(highlightBuilder);

添加聚合查询

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
TermsAggregationBuilder aggregation = AggregationBuilders.terms("by_company")
.field("company.keyword");
aggregation.subAggregation(AggregationBuilders.avg("average_age")
.field("age"));
searchSourceBuilder.aggregation(aggregation);

请求建议词

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
SuggestionBuilder termSuggestionBuilder =
SuggestBuilders.termSuggestion("user").text("kmichy");
SuggestBuilder suggestBuilder = new SuggestBuilder();
suggestBuilder.addSuggestion("suggest_user", termSuggestionBuilder);
searchSourceBuilder.suggest(suggestBuilder);

分析查询和聚合

profile API
可用于为特定搜索分析查询和聚合的执行情况。为了使用它, 必须在 SearchSourceBuilder 上设置
profile
标志为
true
:

SearchSourceBuilder searchSourceBuilder = new SearchSourceBuilder();
searchSourceBuilder.profile(true);

执行
SearchRequest
后, 相应的
SearchResponse
将包含分析结果。

同步执行

SearchResponse searchResponse = client.search(searchRequest);

异步执行

执行
SearchRequest
也可以以异步方式进行, 以便客户端可以直接返回。用户需要通过将请求和监听器传递给异步搜索方法来指定如何处理响应或潜在故障:

ActionListener<SearchResponse> listener = new ActionListener<SearchResponse>() {
@Override
public void onResponse(SearchResponse searchResponse) {

}

@Override
public void onFailure(Exception e) {

}
};
client.searchAsync(searchRequest, listener);

异步方法不会阻止线程,也不会立即返回。完成该操作后, 如果执行成功则回调
ActionListener
onResponse
方法,失败则回调 onFailure` 方法

SearchResponse

执行搜索返回的
SearchResponse
提供了有关搜索执行本身以及对返回访问的文档的详细信息。看下有关于请求执行操作的信息, 如 HTTP 状态代码、执行时间或请求是否提前终止或超时:

RestStatus status = searchResponse.status();
TimeValue took = searchResponse.getTook();
Boolean terminatedEarly = searchResponse.isTerminatedEarly();
boolean timedOut = searchResponse.isTimedOut();

其次, 响应还提供有关在分片级别上执行的信息, 提供有关受影响搜索的分片总数以及成功与失败的分片的统计数据。潜在的故障也可以通过迭代
ShardSearchFailure
数组来处理, 如下面的示例所示:

int totalShards = searchResponse.getTotalShards();
int successfulShards = searchResponse.getSuccessfulShards();
int failedShards = searchResponse.getFailedShards();
for (ShardSearchFailure failure : searchResponse.getShardFailures()) {
// failures should be handled here
}

获取搜索命中的文档

要获得对返回的文档的访问权限, 我们首先需要得到响应中包含的 SearchHits:

SearchHits hits = searchResponse.getHits();
long totalHits = hits.getTotalHits();
float maxScore = hits.getMaxScore();

SearchHits 提供有关所有命中的全局信息, 如命中总数或最大得分:

long totalHits = hits.getTotalHits();
float maxScore = hits.getMaxScore();

嵌套在 SearchHits 中的是可以迭代的单个搜索结果:

SearchHit[] searchHits = hits.getHits();
for (SearchHit hit : searchHits) {
// do something with the SearchHit
}

SearchHit 提供对基本信息的访问, 如索引、类型、docId 和每个搜索命中的分数:

String index = hit.getIndex();
String type = hit.getType();
String id = hit.getId();
float score = hit.getScore();

此外, 它还允许您返回文档源, 既可以是简单的 JSON 字符串, 也可以是键/值对的映射。在此映射中,通常键值对的键为字段名, 值为字段值。多值字段作为对象的列表返回, 嵌套对象作为另一个键/值映射。这些案件需要相应地强制执行:

String sourceAsString = hit.getSourceAsString();
Map<String, Object> sourceAsMap = hit.getSourceAsMap();
String documentTitle = (String) sourceAsMap.get("title");
List<Object> users = (List<Object>) sourceAsMap.get("user");
Map<String, Object> innerObject =
(Map<String, Object>) sourceAsMap.get("innerObject");

获取高亮结果

可以从结果中获取每个
SearchHit
中高亮显示的文本片段。
SearchHit
提供对
HighlightField
实例的访问, 其中每一个都包含一个或多个突出显示的文本片段:

SearchHits hits = searchResponse.getHits();
for (SearchHit hit : hits.getHits()) {
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
HighlightField highlight = highlightFields.get("title");
Text[] fragments = highlight.fragments();
String fragmentString = fragments[0].string();
}

获取聚合结果

可以从 SearchResponse 获取聚合结果, 首先获取聚合树的根、聚合对象, 然后按名称获取聚合

Aggregations aggregations = searchResponse.getAggregations();
Terms byCompanyAggregation = aggregations.get("by_company");
Bucket elasticBucket = byCompanyAggregation.getBucketByKey("Elastic");
Avg averageAge = elasticBucket.getAggregations().get("average_age");
double avg = averageAge.getValue();

请注意, 如果按名称访问聚合, 则需要根据所请求的聚合类型指定聚合接口, 否则将引发抛出:

Range range = aggregations.get("by_company"); //这将引发异常, 因为 "by_company" 是一个term聚合, 但这里尝试将它一范围聚合取出

还可以将所有的聚合 转化成
map
,以聚合名作为
key
值。在这种情况下,需要显示强制转换到正确的类型:

Map<String, Aggregation> aggregationMap = aggregations.getAsMap();
Terms companyAggregation = (Terms) aggregationMap.get("by_company");

还有一些 getter 将所有顶层聚合作为列表返回:

List<Aggregation> aggregationList = aggregations.asList();

最后, 可以遍历所有聚合, 然后根据它们的类型决定如何进一步处理它们:

for (Aggregation agg : aggregations) {
String type = agg.getType();
if (type.equals(TermsAggregationBuilder.NAME)) {
Bucket elasticBucket = ((Terms) agg).getBucketByKey("Elastic");
long numberOfDocs = elasticBucket.getDocCount();
}
}

获取建议

要从 SearchResponse 中返回
suggestions
, 请使用
suggestion
对象作为入口点, 然后检索嵌套的建议对象:

Suggest suggest = searchResponse.getSuggest();
TermSuggestion termSuggestion = suggest.getSuggestion("suggest_user");
for (TermSuggestion.Entry entry : termSuggestion.getEntries()) {
for (TermSuggestion.Entry.Option option : entry) {
String suggestText = option.getText().string();
}
}

获取性能分析结果

使用
getProfileResults ()
方法从
SearchResponse
检索性能分析结果。此方法返回一个
Map
,包含
SearchRequest
执行中所涉及的每个分片的
ProfileShardResult
对象。
ProfileShardResult
存储在映射中, 使用唯一标识配置文件结果对应的碎片的键. 

下面是一个示例代码, 它演示如何循环访问每个分片的所有性能分析结果:

Map<String, ProfileShardResult> profilingResults =
searchResponse.getProfileResults();
for (Map.Entry<String, ProfileShardResult> profilingResult : profilingResults.entrySet()) {
String key = profilingResult.getKey();
ProfileShardResult profileShardResult = profilingResult.getValue();
}

ProfileShardResult
对象本身包含一个或多个
QueryProfileShardResult
:

List<QueryProfileShardResult> queryProfileShardResults =
profileShardResult.getQueryProfileResults();
for (QueryProfileShardResult queryProfileResult : queryProfileShardResults) {

}

for (ProfileResult profileResult : queryProfileResult.getQueryResults()) {
String queryName = profileResult.getQueryName();
long queryTimeInMillis = profileResult.getTime();
List<ProfileResult> profiledChildren = profileResult.getProfiledChildren();
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ElasticSearch Java