您的位置:首页 > 编程语言 > Java开发

Elasticsearch通过JAVA创建索引、Mapping以及数据的增删该查操作

2018-07-10 11:57 706 查看
因为目前项目中用的JDK是1.7版本的,而ElasticSearch5.x版本需要使用JDK1.8版本,所以以下的代码都是基于ElasticSearch 2.4.6版本的,以下的代码是从项目中提取出来的,所以有些跟业务相关的代码就不贴出来了,仅供自己只好参考使用,所以直接看以下代码,可能很多代码是看不懂的。

引入Maven

<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>2.4.6</version>
</dependency>

枚举工具类

package com.linewell.ccip.utils.es;

import java.lang.reflect.Field;
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**
* 枚举工具类
*/
public class EnumUtils {

// 枚举字符串转枚举类型的转换方法
private static String FROM_STRING = "fromString";

/**
* 供
* 解析通用查询参数(分页参数、排序参数)
* DataGridUtils.parseQueryInfo方法调用
*
* 根据类、字段名称、字段值(枚举字符),若字段为枚举,返回枚举值,否则直接返回字段值
* @param entityCls 类,如com.linewell.ccip.servicesbase.bean.log.OrgLog.class
* @param fieldName 字段名称,如com.linewell.ccip.servicesbase.bean.log.type.OperType
* @param fieldVal 字段(枚举字符串),如INSERT
* @return 若字段为枚举,返回枚举值,否则直接返回字段值
*/
public static Object getValByField(Class<?> entityCls, String fieldName, Object fieldVal) {
Object obj = null;
try {
// 字段类型
Class<?> fieldCls = getFieldType(entityCls, fieldName);
// 字段类型是否为枚举类型
boolean isEnumCls = fieldCls.isEnum();

// 是枚举类
if (isEnumCls) {
obj = getEnum(fieldCls, (String)fieldVal);
} else {
obj = fieldVal;
}
} catch (SecurityException e) {
e.printStackTrace();
} catch (IllegalArgumentException e) {
e.printStackTrace();
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
return obj;
}

/**
* 根据枚举字符串获取枚举值
* @param fieldCls	枚举类
* @param fieldVal	枚举字符串
* @return			枚举值
* @throws IllegalAccessException
* @throws InvocationTargetException
*/
private static Enum<?> getEnum(Class<?> fieldCls, String fieldVal)
throws IllegalAccessException, InvocationTargetException {
Enum<?> enumCls = null;
// 全部的方法
Method[] methods = fieldCls.getMethods();
// 方法不为空
if (null != methods && 0 < methods.length) {
// 方法名称
String metName = null;
// 遍历全部方法
for (Method method : methods) {
metName = method.getName();
// 枚举类的字符串转枚举的方法
if (FROM_STRING.equalsIgnoreCase(metName)) {
enumCls = (Enum<?>) method.invoke(fieldCls, fieldVal);
break;
}
}
}
return enumCls;
}

/**
* 根据类、属性名获取其属性类型
* @param cls
* @param findFieldName
* @return
*/
private static Class<?> getFieldType(Class<?> cls, String findFieldName) {
// 字段类型
Class<?> fieldCls = null;
try {
// 获取该类自身所声明的属性,没有获取父类声明的属性
List<Field> fields = getFields(cls);
// 属性不为空
if (null != fields) {
// 是否找到属性
boolean isFind = false;
// 属性名
String fieldName = "";
// 变量属性数组
for (Field field : fields) {
fieldName = field.getName();
// 类自身找到属性获取其属性类型
if (findFieldName.equalsIgnoreCase(fieldName)) {
isFind = true;
fieldCls = field.getType();
break;
}
}
// 类自身没有找到属性获取其属性类型,查找其父类声明的属性
if (false == isFind) {
Field supField = cls.getSuperclass().getDeclaredField(findFieldName);
fieldCls = supField.getType();
}
}
} catch (SecurityException e) {
e.printStackTrace();
} catch (NoSuchFieldException e) {
e.printStackTrace();
}
return fieldCls;
}

private static List<Field> getFields(Class<?> objClass) {
Field[] fields = objClass.getDeclaredFields();
List<Field> fieldList = new ArrayList<>();
fieldList.addAll(Arrays.asList(fields));
while (null != objClass) {
fieldList.addAll(Arrays.asList(objClass.getDeclaredFields()));
objClass = objClass.getSuperclass();
}
return fieldList;
}
}

ElasticSearch工具类

package com.linewell.ccip.utils.es;

import com.google.common.collect.Lists;
import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsRequest;
import org.elasticsearch.action.admin.indices.exists.indices.IndicesExistsResponse;
import org.elasticsearch.action.admin.indices.mapping.put.PutMappingRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.IndicesAdminClient;
import org.elasticsearch.client.Requests;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.cluster.metadata.MappingMetaData;
import org.elasticsearch.common.collect.ImmutableOpenMap;
import org.elasticsearch.common.compress.CompressedXContent;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;

import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Modifier;
import java.net.InetAddress;
import java.util.*;

import static org.elasticsearch.common.xcontent.XContentFactory.jsonBuilder;

/**
* ElasticSearch工具类
*
* @author zchuanzhao
* 2018/7/3
*/
public class EsHandler {
/**
* 需要分词字段
*/
private static List<String> ANALYZED_FIELD = Lists.newArrayList("userName", "authType");
/**
* 集群名称cluster.name
*/
public static final String ES_CLUSTER_NAME = "logdb";
/**
* 主机IP
*/
private static final String ES_HOST = "localhost";
/**
* 端口号
*/
private static final int ES_TCP_PORT = 9300;

public ThreadLocal<SearchResponse> responseThreadLocal = new ThreadLocal<>();

private static final Map<String, String> MAPPINGS = new HashMap<>();

//    Settings settings = Settings.settingsBuilder()
//            .put("cluster.name", "bropen")
//            .put("shield.user","bropen:password")
//            .build();

static Settings settings = Settings.settingsBuilder().put("cluster.name", ES_CLUSTER_NAME).build();
// 创建ES客户端
private static TransportClient client;

static {
try {
client = TransportClient.builder().settings(settings).build().addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ES_HOST), ES_TCP_PORT));
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 取得ES客户端
*/
public static TransportClient getClient() {
return client;
}

/**
* 关闭客户端
*
* @param client
*/
public static void close(TransportClient client) {
if (null != client) {
client.close();
}
}

/**
* Mapping处理
*
* @param obj
*/
public static void mapping(Object obj) {
String type = obj.getClass().getSimpleName().toLowerCase();
if (!MAPPINGS.containsKey(type)) {
synchronized (EsHandler.class) {
createIndex();
//获取所有的Mapping
ImmutableOpenMap<String, MappingMetaData> mappings = client.admin().cluster().prepareState().execute()
.actionGet().getState().getMetaData().getIndices().get(ES_CLUSTER_NAME).getMappings();
MappingMetaData mmd = mappings.get(type);
if (null == mmd) {
createMapping(obj);
} else {
CompressedXContent xContent = mappings.get(type).source();
if (xContent == null) {
createMapping(obj);
} else {
String mapping = xContent.toString();
MAPPINGS.put(type, mapping);
}
}
}
}
}

/**
* 创建Mapping
*
* @param obj
*/
public static void createMapping(Object obj) {
String type = obj.getClass().getSimpleName().toLowerCase();
PutMappingRequest mapping = Requests.putMappingRequest(ES_CLUSTER_NAME).type(type).source(setMapping(obj));
EsHandler.getClient().admin().indices().putMapping(mapping).actionGet();
}

/**
* 设置对象的ElasticSearch的Mapping
*
* @param obj
* @return
*/
public static XContentBuilder setMapping(Object obj) {
List<Field> fieldList = getFields(obj);
XContentBuilder mapping = null;
try {
mapping = jsonBuilder().startObject().startObject("properties");
for (Field field : fieldList) {
//修饰符是static的字段不处理
if (Modifier.isStatic(field.getModifiers())){
continue;
}
String name = field.getName();
if (ANALYZED_FIELD.contains(name)) {
mapping.startObject(name)
.field("type", getElasticSearchMappingType(field.getType().getSimpleName().toLowerCase()))
.field("index", "analyzed")
//使用IK分词器
.field("analyzer", "ik_max_word")
.field("search_analyzer", "ik_max_word")
.endObject();
} else {
mapping.startObject(name)
.field("type", getElasticSearchMappingType(field.getType().getSimpleName().toLowerCase()))
.field("index", "not_analyzed")
.endObject();
}
}
mapping.endObject().endObject();
} catch (IOException e) {
e.printStackTrace();
}
return mapping;
}

/**
* 获取对象所有自定义的属性
*
* @param obj
* @return
*/
private static List<Field> getFields(Object obj) {
Field[] fields = obj.getClass().getDeclaredFields();
List<Field> fieldList = new ArrayList<>();
fieldList.addAll(Arrays.asList(fields));
Class objClass = obj.getClass();
while (null != objClass) {
fieldList.addAll(Arrays.asList(objClass.getDeclaredFields()));
objClass = objClass.getSuperclass();
}
return fieldList;
}

/**
* java类型与ElasticSearch类型映射
*
* @param varType
* @return
*/
private static String getElasticSearchMappingType(String varType) {
String es;
switch (varType) {
case "date":
es = "date";
break;
case "double":
es = "double";
break;
case "long":
es = "long";
break;
case "int":
es = "long";
break;
default:
es = "string";
break;
}
return es;
}

/**
* 判断ElasticSearch中的索引是否存在
*/
private static boolean indexExists() {
IndicesAdminClient adminClient = client.admin().indices();
IndicesExistsRequest request = new IndicesExistsRequest(ES_CLUSTER_NAME);
IndicesExistsResponse response = adminClient.exists(request).actionGet();
if (response.isExists()) {
return true;
}
return false;
}

/**
* 创建索引
*/
private static void createIndex() {
if (!indexExists()) {
CreateIndexRequest request = new CreateIndexRequest(ES_CLUSTER_NAME);
client.admin().indices().create(request);
}
}
}

ElasticSearch查询条件操作工具类

package com.linewell.ccip.utils.es;

import com.linewell.ccip.common.bean.datalist.engine.QueryInfo;
import net.sf.json.JSONObject;
import org.apache.commons.lang.StringUtils;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.index.query.RangeQueryBuilder;
import org.elasticsearch.search.sort.SortOrder;

import java.util.Map;
import java.util.Set;

/**
* ElasticSearch查询条件操作工具类
* @author zchuanzhao
* 2018/7/5
*/
public class EsDataGridUtils {

/**
* 排序字段
*/
private static final String SORT_FIELD = "sortField";

/**
* 排序类型
*/
private static final String SORT_ORDER = "sortOrder";

/**
* 查询字段
*/
private static final String QUERY_KEY = "query";

/**
* 相等条件
*/
private static final String EQUALS_KEY = "equals";

/**
* 模糊匹配条件
*/
private static final String LIKE_KEY = "like";

/**
* 大于等于条件
*/
private static final String GREATER_EQUALS_KEY = "ge";

/**
* 小于等于条件
*/
private static final String LESS_EQUALS_KEY = "le";

/**
* 查询条件关系
*/
private static final String SELECT_KEY = "select";

/**
* AND条件关系
*/
private static final String AND_KEY = "and";
/**
* ASC
*/
private static final String ASC_KEY = "asc";

/**
*  解析通用查询参数(分页参数、排序参数)
* @param builder
* @param queryInfo 查询信息
* @param beanClass 查询对象
* @param defaultSort 默认排序
*/
public static void parseQueryInfo(SearchRequestBuilder builder, QueryInfo queryInfo, Class beanClass, String defaultSort) {
Map<String, String> requestMap = queryInfo.getRequestMap();
BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();
// 排序类型
SortOrder sortOrder = SortOrder.DESC;
// 排序字段
String sortField = defaultSort;
// 查询字段
if (requestMap != null) {
//获取排序信息
if (requestMap.containsKey(SORT_ORDER)) {
String sortOrderValue = requestMap.get(SORT_ORDER);
if (ASC_KEY.equals(sortOrderValue)){
sortOrder = SortOrder.ASC;
}
}
if (requestMap.containsKey(SORT_FIELD) && null !=  requestMap.get(SORT_FIELD)) {
sortField = requestMap.get(SORT_FIELD);
}
//获取查询条件信息
if (requestMap.containsKey(QUERY_KEY)) {
String queryStr = requestMap.get(QUERY_KEY);
if(!StringUtils.isEmpty(queryStr)) {
JSONObject queryObj = JSONObject.fromObject(queryStr);
// 相等的字段
JSONObject equalsObj = queryObj.optJSONObject(EQUALS_KEY);
// 匹配字段
JSONObject likeObj = queryObj.optJSONObject(LIKE_KEY);
// 匹配字段
JSONObject geObj = queryObj.optJSONObject(GREATER_EQUALS_KEY);
// 匹配字段
JSONObject leObj = queryObj.optJSONObject(LESS_EQUALS_KEY);
if(equalsObj != null) {
Set<String> set = equalsObj.keySet();
// 字段值
Object objVal;
// 字段值
Object fieldVal;
for (String fieldName: set) {
fieldVal = equalsObj.opt(fieldName);
// 若字段为枚举,返回枚举值,否则直接返回字段值
objVal = EnumUtils.getValByField(beanClass, fieldName, fieldVal);
boolQueryBuilder.must(QueryBuilders.matchQuery(fieldName, objVal));
}
}
JSONObject selectObj = queryObj.optJSONObject(SELECT_KEY);
boolean queryAnd = false;
if (null != selectObj) {
queryAnd = (Boolean) selectObj.get(AND_KEY);
}
if(likeObj != null) {
Set<String> set = likeObj.keySet();
for(String key: set) {
QueryBuilder termQuery = QueryBuilders.termQuery(key, likeObj.optString(key));
if (queryAnd){
boolQueryBuilder.must(termQuery);
}else {
boolQueryBuilder.should(termQuery);
}
}
}
//大于等于
if(geObj != null) {
Set<String> set = geObj.keySet();
for(String key: set) {
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery(key).gte(geObj.optString(key));
if (queryAnd){
boolQueryBuilder.must(rangeQuery);
}else {
boolQueryBuilder.should(rangeQuery);
}
}
}
//小于等于
if(leObj != null) {
Set<String> set = leObj.keySet();
for(String key: set) {
RangeQueryBuilder rangeQuery = QueryBuilders.rangeQuery(key).lte(leObj.optString(key));
if (queryAnd){
boolQueryBuilder.must(rangeQuery);
}else {
boolQueryBuilder.should(rangeQuery);
}
}
}
}
}
}
builder.setQuery(boolQueryBuilder);

//分页查询,设置偏移量和每页查询数量
builder.setFrom(queryInfo.getCurrentPageNum() * queryInfo.getCountPerPage()).setSize(queryInfo.getCountPerPage());

//排序
builder.addSort(sortField, sortOrder);
}
}

ElasticSearch操作类接口

package com.linewell.ccip.services.es;

import com.linewell.ccip.common.bean.datalist.engine.QueryInfo;
import com.linewell.ccip.servicesbase.bean.GridData;
import com.linewell.ccip.servicesbase.bean.log.LogBeanEntity;

/**
* ElasticSearch操作类接口
* @author zchuanzhao
* 2018/7/2
*/
public interface IEsManager {
/**
* 保存对象
* @param obj
* @return
*/
boolean save(Object obj);

/**
* 更新
* @param obj
* @return
*/
boolean update(Object obj);

/**
* 删除
* @param unid
* @param beanClass
* @return
*/
boolean delete(String unid, Class beanClass);

/**
* 获取单个对象
* @param unid
* @param beanClass
* @param <T>
* @return
*/
<T extends LogBeanEntity> T get(String unid, Class<T> beanClass);

/**
* 获取分页列表
* @param queryInfo
* @param beanClass
* @param defaultSort
* @param <T>
* @return
*/
<T extends LogBeanEntity> GridData<T> getList(QueryInfo queryInfo, Class<T> beanClass, String defaultSort);
}

ElasticSearch操作类

package com.linewell.ccip.services.es.impl;

import com.linewell.ccip.common.bean.datalist.engine.QueryInfo;
import com.linewell.ccip.services.es.IEsManager;
import com.linewell.ccip.servicesbase.bean.GridData;
import com.linewell.ccip.servicesbase.bean.log.LogBeanEntity;
import com.linewell.ccip.utils.es.EsHandler;
import com.linewell.ccip.utils.es.EsDataGridUtils;
import net.sf.json.JSONObject;
import org.elasticsearch.action.delete.DeleteResponse;
import org.elasticsearch.action.get.GetResponse;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.action.search.ClearScrollRequest;
import org.elasticsearch.action.search.SearchRequestBuilder;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.action.search.SearchType;
import org.elasticsearch.action.update.UpdateResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.unit.TimeValue;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.SearchHits;
import java.util.ArrayList;
import java.util.List;

/**
* ElasticSearch操作类
* @author zchuanzhao
* 2018/7/2
*/
public class EsManagerImpl implements IEsManager {

/**
* TCP连接客户端
*/
private TransportClient client;

public EsManagerImpl() {
this.client = EsHandler.getClient();
}

/**
* 保存对象
* @param obj
* @return
*/
@Override
public boolean save(Object obj) {
EsHandler.mapping(obj);
String className = obj.getClass().getSimpleName().toLowerCase();
JSONObject json = JSONObject.fromObject(obj);
json.put("id",json.get("unid"));
IndexResponse response = client.prepareIndex(EsHandler.ES_CLUSTER_NAME, className, String.valueOf(json.get("id"))).setSource(json).get();
return response.isCreated();
}

/**
* 更新
* @param obj
* @return
*/
@Override
public boolean update(Object obj) {
String className = obj.getClass().getSimpleName().toLowerCase();
JSONObject json = JSONObject.fromObject(obj);
UpdateResponse response = client.prepareUpdate(EsHandler.ES_CLUSTER_NAME, className, String.valueOf(json.get("id")))
.setDoc(json)
.get();

return !response.isCreated();
}

/**
* 删除
* @param unid
* @param beanClass
* @return
*/
@Override
public boolean delete(String unid, Class beanClass) {
DeleteResponse response = client.prepareDelete(EsHandler.ES_CLUSTER_NAME, beanClass.getSimpleName().toLowerCase(), unid).get();
return response.isFound();
}

/**
* 获取单个对象
* @param unid
* @param beanClass
* @param <T>
* @return
*/
@Override
public <T extends LogBeanEntity> T get(String unid, Class<T> beanClass) {
GetResponse response = client.prepareGet(EsHandler.ES_CLUSTER_NAME, beanClass.getSimpleName().toLowerCase(), unid).get();
if (response.isExists()) {
String jsonStr = response.getSourceAsString();
JSONObject json = JSONObject.fromObject(jsonStr);
//TODO json中beanFlag保存的是null字符串,转成bean会报错,所以暂时先设置为null
json.put("beanFlag",null);
T bean = (T) JSONObject.toBean(json, beanClass);
//TODO json -> bean后,time会变成系统当前时间,暂时先用以下方式解决
bean.setTime(json.getLong("time"));
return bean;
}
return null;
}

/**
* 获取分页列表
* @param queryInfo
* @param beanClass
* @param defaultSort
* @param <T>
* @return
*/
@Override
public <T extends LogBeanEntity> GridData<T> getList(QueryInfo queryInfo, Class<T> beanClass, String defaultSort) {
SearchRequestBuilder builder = client.prepareSearch(EsHandler.ES_CLUSTER_NAME).setTypes(beanClass.getSimpleName().toLowerCase()).
setSearchType(SearchType.DEFAULT);
//设置查询条件信息
EsDataGridUtils.parseQueryInfo(builder, queryInfo, beanClass, defaultSort);

SearchResponse response = builder.execute().actionGet();
SearchHits hits = response.getHits();
List<T> list = new ArrayList<>();
for (int i = 0; i < hits.getHits().length; i++) {
String jsonStr = hits.getHits()[i].getSourceAsString();
JSONObject json = JSONObject.fromObject(jsonStr);
//TODO json中beanFlag保存的是null字符串,转成bean会报错,所以暂时先设置为null
json.put("beanFlag", null);
T bean = (T) JSONObject.toBean(json, beanClass);
//TODO json -> bean后,time会变成系统当前时间,暂时先用以下方式解决
bean.setTime(json.getLong("time"));
list.add(bean);
}

GridData<T> gridData = new GridData<>();
gridData.setData(list);
gridData.setTotal(response.getHits().getTotalHits());
return gridData;
}
}

这里分页使用的是from+size方式,因为我们业务是需要跳页和上一页下一页的,所以无法使用scoll方式,但是使用from+size默认只能查询10000以内的数据,所以使用ElasticSearch无法实现我们目前的业务,所以改用了solr。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ElasticSearch Solr JDK
相关文章推荐