您的位置:首页 > 编程语言 > Java开发

Elasticsearch JAVA API

2017-02-09 16:01 176 查看
参考文献:https://www.elastic.co/guide/en/elasticsearch/client/java-api/current/index.html

Maven 工程 pom.xml修改

<dependency>

    <groupId>org.elasticsearch.client</groupId>

    <artifactId>transport</artifactId>

    <version>5.2.0</version>

</dependency>

//client api

通过TransportClient连接ES cluster

// 开始

TransportClient client = new PreBuiltTransportClient(Settings.EMPTY)

        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host1"), 9300))

        .addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("host2"), 9300));

// 结束

client.close();

//设置集群名称

Settings settings = Settings.builder()

        .put("cluster.name", "myClusterName").build();

TransportClient client = new PreBuiltTransportClient(settings);

传输客户端有一个集群嗅探功能,允许动态地添加删除节点。启用嗅探时,传输客户端将连接到节点的内部节点列表,这是通过调用addTransportAddress建造。在这之后,客户机将调用内部集群状态

API在这些节点发现可用的数据节点上。客户端将被替换的内部节点列表中与数据节点。这个列表默认每5秒刷新。注意,IP地址嗅探器连接的声明为这些节点的elasticsearch的发布地址配置。

启用嗅探

Settings settings = Settings.settingsBuilder()

        .put("client.transport.sniff", true).build();

TransportClient client = new PreBuiltTransportClient(settings);

//Document APIs

Index API: 创建并建立索引

Get API: 获取文档

DELETE API: 删除文档

UPDATE API: 更新文档

Multi Get API: 一次批量获取文档

Bulk API: 批量操作,批量操作中可以执行增删改查

//Index API

导入模块

import static org.elasticsearch.common.xcontent.XContentFactory.*;

IndexResponse response = client.prepareIndex("twitter", "tweet", "1")

        .setSource(jsonBuilder()

                    .startObject()

                        .field("user", "kimchy")

                        .field("postDate", new Date())

                        .field("message", "trying out Elasticsearch")

                    .endObject()

                  )

        .get();

//GET API

GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

//Delete API

DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

指定删除(删除查询)

BulkIndexByScrollResponse response =

    DeleteByQueryAction.INSTANCE.newRequestBuilder(client)

        .filter(QueryBuilders.matchQuery("gender", "male"))

        .source("persons")                                  

        .get();                                             

long deleted = response.getDeleted();

异步方式:

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)

    .filter(QueryBuilders.matchQuery("gender", "male"))                  

    .source("persons")                                                   

    .execute(new ActionListener<BulkIndexByScrollResponse>() {           

        @Override

        public void onResponse(BulkIndexByScrollResponse response) {

            long deleted = response.getDeleted();                        

        }

        @Override

        public void onFailure(Exception e) {

            // Handle the exception

        }

    });

//Update API

DeleteByQueryAction.INSTANCE.newRequestBuilder(client)

    .filter(QueryBuilders.matchQuery("gender", "male"))                  

    .source("persons")                                                   

    .execute(new ActionListener<BulkIndexByScrollResponse>() {           

        @Override

        public void onResponse(BulkIndexByScrollResponse response) {

            long deleted = response.getDeleted();                        

        }

        @Override

        public void onFailure(Exception e) {

            // Handle the exception

        }

    });

或者使用 prepareUpdate() 方法

client.prepareUpdate("ttl", "doc", "1")

        .setScript(new Script("ctx._source.gender = \"male\""  , ScriptService.ScriptType.INLINE, null, null))

        .get();

client.prepareUpdate("ttl", "doc", "1")

        .setDoc(jsonBuilder()               

            .startObject()

                .field("gender", "male")

            .endObject())

        .get();

更新合并

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")

        .doc(jsonBuilder()

            .startObject()

                .field("gender", "male")

            .endObject());

client.update(updateRequest).get();

也支持upsert,就是没有就创建

IndexRequest indexRequest = new IndexRequest("index", "type", "1")

        .source(jsonBuilder()

            .startObject()

                .field("name", "Joe Smith")

                .field("gender", "male")

            .endObject());

UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")

        .doc(jsonBuilder()

            .startObject()

                .field("gender", "male")

            .endObject())

        .upsert(indexRequest);              

client.update(updateRequest).get();

//Multi Get API 联合查询

MultiGetResponse multiGetItemResponses = client.prepareMultiGet()

    .add("twitter", "tweet", "1")           

    .add("twitter", "tweet", "2", "3", "4")

    .add("another", "type", "foo")          

    .get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) {

    GetResponse response = itemResponse.getResponse();

    if (response.isExists()) {                      

        String json = response.getSourceAsString();

    }

}

//Bulk API 索引和删除在一个请求

import static org.elasticsearch.common.xcontent.XContentFactory.*;

BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests

bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")

        .setSource(jsonBuilder()

                    .startObject()

                        .field("user", "kimchy")

                        .field("postDate", new Date())

                        .field("message", "trying out Elasticsearch")

                    .endObject()

                  )

        );

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")

        .setSource(jsonBuilder()

                    .startObject()

                        .field("user", "kimchy")

                        .field("postDate", new Date())

                        .field("message", "another post")

                    .endObject()

                  )

        );

BulkResponse bulkResponse = bulkRequest.get();

if (bulkResponse.hasFailures()) {

    // process failures by iterating through each bulk response item

}

BulkProcessor类提供了一个简单的接口自动冲洗批量操作基于请求的数量或大小,或者在给定的时间。

import org.elasticsearch.action.bulk.BackoffPolicy;

import org.elasticsearch.action.bulk.BulkProcessor;

import org.elasticsearch.common.unit.ByteSizeUnit;

import org.elasticsearch.common.unit.ByteSizeValue;

import org.elasticsearch.common.unit.TimeValue;

BulkProcessor bulkProcessor = BulkProcessor.builder(

        client,  

        new BulkProcessor.Listener() {

            @Override

            public void beforeBulk(long executionId,

                                   BulkRequest request) { ... }

            @Override

            public void afterBulk(long executionId,

                                  BulkRequest request,

                                  BulkResponse response) { ... }

            @Override

            public void afterBulk(long executionId,

                                  BulkRequest request,

                                  Throwable failure) { ... }

        })

        .setBulkActions(10000)

        .setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB))

        .setFlushInterval(TimeValue.timeValueSeconds(5))

        .setConcurrentRequests(1)

        .setBackoffPolicy(

            BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))

        .build();

//Search API

先看一个例子,使用SearchSourceBuilder

import org.elasticsearch.action.search.SearchResponse;

import org.elasticsearch.action.search.SearchType;

import org.elasticsearch.index.query.QueryBuilders.*;

SearchResponse response = client.prepareSearch("index1", "index2")

        .setTypes("type1", "type2")

        .setSearchType(SearchType.DFS_QUERY_THEN_FETCH)

        .setQuery(QueryBuilders.termQuery("multi", "test"))                 // Query

        .setPostFilter(QueryBuilders.rangeQuery("age").from(12).to(18))     // Filter

        .setFrom(0).setSize(60).setExplain(true)

        .get();

//MultiSearch API

SearchRequestBuilder srb1 = client

    .prepareSearch().setQuery(QueryBuilders.queryStringQuery("elasticsearch")).setSize(1);

SearchRequestBuilder srb2 = client

    .prepareSearch().setQuery(QueryBuilders.matchQuery("name", "kimchy")).setSize(1);

MultiSearchResponse sr = client.prepareMultiSearch()

        .add(srb1)

        .add(srb2)

        .get();

You will get all individual responses from MultiSearchResponse#getResponses()

long nbHits = 0;

for (MultiSearchResponse.Item item : sr.getResponses()) {

    SearchResponse response = item.getResponse();

    nbHits += response.getHits().getTotalHits();

}

//Using Aggregations 聚合查询

SearchResponse sr = client.prepareSearch()

    .setQuery(QueryBuilders.matchAllQuery())

    .addAggregation(

            AggregationBuilders.terms("agg1").field("field")

    )

    .addAggregation(

            AggregationBuilders.dateHistogram("agg2")

                    .field("birth")

                    .dateHistogramInterval(DateHistogramInterval.YEAR)

    )

    .get();

Get your facet results

Terms agg1 = sr.getAggregations().get("agg1");

DateHistogram agg2 = sr.getAggregations().get("agg2");

//Structuring aggregations

聚合条件

日期直方图聚合

平均聚合

SearchResponse sr = node.client().prepareSearch()

    .addAggregation(

        AggregationBuilders.terms("by_country").field("country")

        .subAggregation(AggregationBuilders.dateHistogram("by_year")

            .field("dateOfBirth")

            .dateHistogramInterval(DateHistogramInterval.YEAR)

            .subAggregation(AggregationBuilders.avg("avg_children").field("children"))

        )

    )

    .execute().actionGet();

//Java API Administration

//Indices Administration

IndicesAdminClient indicesAdminClient = client.admin().indices();

创建index

client.admin().indices().prepareCreate("twitter").get();

设置setting:

client.admin().indices().prepareCreate("twitter")

        .setSettings(Settings.builder()             

                .put("index.number_of_shards", 3)

                .put("index.number_of_replicas", 2)

        )

        .get();

为索引建mapping

client.admin().indices().prepareCreate("twitter")   

        .addMapping("tweet", "{\n" +                

                "    \"tweet\": {\n" +

                "      \"properties\": {\n" +

                "        \"message\": {\n" +

                "          \"type\": \"string\"\n" +

                "        }\n" +

                "      }\n" +

                "    }\n" +

                "  }")

        .get();

检索设置:

GetSettingsResponse response = client.admin().indices()

        .prepareGetSettings("company", "employee").get();                           

for (ObjectObjectCursor<String, Settings> cursor : response.getIndexToSettings()) {

    String index = cursor.key;                                                      

    Settings settings = cursor.value;                                               

    Integer shards = settings.getAsInt("index.number_of_shards", null);             

    Integer replicas = settings.getAsInt("index.number_of_replicas", null);         

}

//Cluster Administration

ClusterAdminClient clusterAdminClient = client.admin().cluster();

集群健康

ClusterHealthResponse healths = client.admin().cluster().prepareHealth().get();

String clusterName = healths.getClusterName();              

int numberOfDataNodes = healths.getNumberOfDataNodes();     

int numberOfNodes = healths.getNumberOfNodes();             

for (ClusterIndexHealth health : healths.getIndices().values()) {

    String index = health.getIndex();                       

    int numberOfShards = health.getNumberOfShards();        

    int numberOfReplicas = health.getNumberOfReplicas();    

    ClusterHealthStatus status = health.getStatus();        

}

集群状态

client.admin().cluster().prepareHealth()            

        .setWaitForYellowStatus()                   

        .get();

client.admin().cluster().prepareHealth("company")   

        .setWaitForGreenStatus()                    

        .get();

client.admin().cluster().prepareHealth("employee")  

        .setWaitForGreenStatus()                    

        .setTimeout(TimeValue.timeValueSeconds(2))  
        .get();

【原创】原创文章,更多关注敬请关注微信公众号。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息