您的位置:首页 > 其它

ElasticSearch源码解析(四):检索(Search)

2016-08-05 13:58 399 查看
聊ElasticSearch的检索过程,首先展示一个简单的java检索例子:

<span style="white-space:pre">	</span><p>Client client = new TransportClient.Builder()
.settings(Settings.settingsBuilder().put("discovery.type", "zen") //发现集群方式
.put("discovery.zen.minimum_master_nodes", 2) //最少有2个master存在
.put("discovery.zen.ping_timeout", "200ms") //集群ping时间,太小可能会因为网络通信而导致不能发现集群
.build())
.build()
.addTransportAddress(new InetSocketTransportAddress(new InetSocketAddress("localhost", 9300)));</p><span style="white-space:pre">	</span>SearchResponse response = client.prepareSearch("users")
.setTypes("user")
.setSearchType(SearchType.DFS_QUERY_THEN_FETCH)
.setQuery(QueryBuilders.termQuery("name", "fox")) // Query
.setFilter(FilterBuilders.rangeFilter("age").from(20).to(30)) // Filter
.setFrom(0).setSize(60).setExplain(true).execute().actionGet();
SearchHits hits = response.getHits();
for (int i = 0; i < hits.getHits().length; i++) {
System.out.println(hits.getHits()[i].getSourceAsString());
}

检索的第一步是对集群的工作方式进行一些必要的设置,Settings即是对elasticsearch配置的封装,它使用了建造者模式(Builder Pattern)。

public static Builder builder() {
return new Builder();
}

/**
* Returns a builder to be used in order to build settings.
*/
public static Builder settingsBuilder() {
return new Builder();
}

/**
* A builder allowing to put different settings and then {@link #build()} an immutable
* settings implementation. Use {@link Settings#settingsBuilder()} in order to
* construct it.
*/
public static class Builder {

public static final Settings EMPTY_SETTINGS = new Builder().build();

private final Map<String, String> map = new LinkedHashMap<>();

private Builder() {

}
/**
* Builds a {@link Settings} (underlying uses {@link Settings}) based on everything
* set on this builder.
*/
public Settings build() {
return new Settings(Collections.unmodifiableMap(map));
}
}

具体的配置属性可以参照官方文档去设置,这里就不一一赘述了。

创建好配置需要将其添加到客户端,这里我们使用TransportClient的方式通过socket的方式与服务器直连,TransportClient是一种轻量级的连接方式,它使用netty通讯框架与服务器通讯,易于维护。

TransportClient的源码使用了建造者模式生成客户端对象。

public static class Builder {
/**
* The settings to configure the transport client with.
*/
public Builder settings(Settings.Builder settings) {
return settings(settings.build());
}
/**
* Builds a new instance of the transport client.
*/
public TransportClient build() {
Settings settings = InternalSettingsPreparer.prepareSettings(this.settings);
settings = settingsBuilder()
.put(NettyTransport.PING_SCHEDULE, "5s") // 5秒ping一次服务器,心跳机制
.put(settings)
.put("network.server", false)
.put("node.client", true)//以客户端的方式进行连接
.put(CLIENT_TYPE_SETTING, CLIENT_TYPE)
.build();

PluginsService pluginsService = new PluginsService(settings, null, null, pluginClasses);
this.settings = pluginsService.updatedSettings();

Version version = Version.CURRENT;

final ThreadPool threadPool = new ThreadPool(settings);//创建线程池
NamedWriteableRegistry namedWriteableRegistry = new NamedWriteableRegistry();

boolean success = false;
try {
ModulesBuilder modules = new ModulesBuilder();
modules.add(new Version.Module(version));
//添加模块
// plugin modules must be added here, before others or we can get crazy injection errors...
for (Module pluginModule : pluginsService.nodeModules()) {
modules.add(pluginModule);
}
modules.add(new PluginsModule(pluginsService));
modules.add(new SettingsModule(this.settings));
modules.add(new NetworkModule(namedWriteableRegistry));
modules.add(new ClusterNameModule(this.settings));
modules.add(new ThreadPoolModule(threadPool));
modules.add(new TransportModule(this.settings, namedWriteableRegistry));
modules.add(new SearchModule() {
@Override
protected void configure() {
// noop
}
});
modules.add(new ActionModule(true));
modules.add(new ClientTransportModule());
modules.add(new CircuitBreakerModule(this.settings));

pluginsService.processModules(modules);

Injector injector = modules.createInjector();
final TransportService transportService = injector.getInstance(TransportService.class);
transportService.start();
transportService.acceptIncomingRequests();

TransportClient transportClient = new TransportClient(injector);
success = true;
return transportClient;
} finally {
if (!success) {
ThreadPool.terminate(threadPool, 10, TimeUnit.SECONDS);
}
}
}

创建好客户端后,我们就可以进行查询了,首先进行一些查询方式的设置,client.prepareSearch对查询进行一些预处理,主要是设置索引indices对象和创建查询请求。然后setTypes()设置文档类型,setSearchType()设置检索行为,与搜索精度有关(比如更精确的计算词频并排序)。紧接着设置过滤器,查询对象(符合Query DSL灵域专用语言),设置分页,执行查询。然后返回一组结果SearchHits。它包含一组SearchHit对象,每个对象包括词型,得分,原文,索引,高亮显示等属性。

public interface SearchHit extends Streamable, ToXContent, Iterable<SearchHitField> {

/**
* The score.
*/
float score();

/**
* The score.
*/
float getScore();

/**
* The index of the hit.
*/
String index();

/**
* The index of the hit.
*/
String getIndex();

/**
* The id of the document.
*/
String id();

/**
* The id of the document.
*/
String getId();

/**
* The type of the document.
*/
String type();

/**
* The type of the document.
*/
String getType();

/**
* If this is a nested hit then nested reference information is returned otherwise <code>null</code> is returned.
*/
NestedIdentity getNestedIdentity();

/**
* The version of the hit.
*/
long version();

/**
* The version of the hit.
*/
long getVersion();

/**
* Returns bytes reference, also un compress the source if needed.
*/
BytesReference sourceRef();

/**
* Returns bytes reference, also un compress the source if needed.
*/
BytesReference getSourceRef();

/**
* The source of the document (can be <tt>null</tt>). Note, its a copy of the source
* into a byte array, consider using {@link #sourceRef()} so there won't be a need to copy.
*/
byte[] source();

/**
* Is the source empty (not available) or not.
*/
boolean isSourceEmpty();

/**
* The source of the document as a map (can be <tt>null</tt>).
*/
Map<String, Object> getSource();

/**
* The source of the document as string (can be <tt>null</tt>).
*/
String sourceAsString();

/**
* The source of the document as string (can be <tt>null</tt>).
*/
String getSourceAsString();

/**
* The source of the document as a map (can be <tt>null</tt>).
*/
Map<String, Object> sourceAsMap() throws ElasticsearchParseException;

/**
* If enabled, the explanation of the search hit.
*/
Explanation explanation();

/**
* If enabled, the explanation of the search hit.
*/
Explanation getExplanation();

/**
* The hit field matching the given field name.
*/
public SearchHitField field(String fieldName);

/**
* A map of hit fields (from field name to hit fields) if additional fields
* were required to be loaded.
*/
Map<String, SearchHitField> fields();

/**
* A map of hit fields (from field name to hit fields) if additional fields
* were required to be loaded.
*/
Map<String, SearchHitField> getFields();

/**
* A map of highlighted fields.
*/
Map<String, HighlightField> highlightFields();

/**
* A map of highlighted fields.
*/
Map<String, HighlightField> getHighlightFields();

/**
* An array of the sort values used.
*/
Object[] sortValues();

/**
* An array of the sort values used.
*/
Object[] getSortValues();

/**
* The set of query and filter names the query matched with. Mainly makes sense for compound filters and queries.
*/
String[] matchedQueries();

/**
* The set of query and filter names the query matched with. Mainly makes sense for compound filters and queries.
*/
String[] getMatchedQueries();

/**
* The shard of the search hit.
*/
SearchShardTarget shard();

/**
* The shard of the search hit.
*/
SearchShardTarget getShard();

/**
* @return Inner hits or <code>null</code> if there are none
*/
Map<String, SearchHits> getInnerHits();

/**
* Encapsulates the nested identity of a hit.
*/
public interface NestedIdentity {

/**
* Returns the nested field in the source this hit originates from
*/
public Text getField();

/**
* Returns the offset in the nested array of objects in the source this hit
*/
public int getOffset();

/**
* Returns the next child nested level if there is any, otherwise <code>null</code> is returned.
*
* In the case of mappings with multiple levels of nested object fields
*/
public NestedIdentity getChild();
}
}
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息