您的位置:首页 > 编程语言 > Java开发

SpringBoot集成ElasticSearch框架详解--项目搭建及实战必备手册

2019-03-06 20:25 639 查看

一:ElasticSearch介绍

ES 是对Lucene进行了封装,开源的、高度可拓展的、可用于大数据存储的全文搜索、分析引擎。
ES 主要特点:分布式、高可用、异步写入。
ES 核心概念:实时|集群|节点(保存数据)|索引|分片(将索引分片)|副本(分片可设置多个副本)
ES 使用案例: 维基百科、Stack Overflow、Github。

二:版本兼容关系

Spring Boot Version (x) Spring Data Elasticsearch Version (y) Elasticsearch Version (z)
x <= 1.3.5 y <= 1.3.4 z <= 1.7.2*
x >= 1.4.x 2.0.0 <=y < 5.0.0** 2.0.0 <= z < 5.0.0**

三:SpringBoot1.x集成ElasticSearch5.x框架详解

(1)如果框架定型SpringBoot1.5.X以下,那就不能用spring-boot-starter-data-elasticsearch,需要用TransportClient来实现elasticSearch5.X的集成。

(2)下载elasticsearch5.2.2
https://www.elastic.co/downloads/past-releases

(3) 配置文件

config/elasticsearch.yml   主配置文件
config/jvm.options         jvm参数配置文件
cofnig/log4j2.properties   日志配置文件

(4) 关键属性配置

属性 配置值 说明
cluster.name hsyt 同一个服务上不同节点通过同一个集群名建立联系
node.name node-1 默认的elasticsearch 随机生成uuid的前7个字符作为节点id,可以手工设置
path.conf /path/to/conf 配置文件的存储路径,默认是es根目录下的config文件夹
path.logs /path/to/logs 日志文件的存储路径,默认是es根目录下的logs文件夹
bootstrap.memory_lock true 当jvm开始swap写入交换空间时es的效率会降低,所以要保证它不swap,这对节点健康极其重要。一种方法是将其设置为true
http.cors.enabled true 使用head等监控集群时需要配置
http.cors.allow-origin “*” 使用head等监控集群时需要配置
http.cors.allow-credential true 使用head等监控集群时需要配置

(5) SpringBoot框架搭建,如下图结构(详见之前我的博客步骤,此处省略)
https://blog.csdn.net/For_niu/article/details/87875470

(5) 服务提供者和服务消费者pom.xml文件配置如下依赖

<properties>
<elasticsearch.version>5.2.2</elasticsearch.version>
</properties>

<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>${elasticsearch.<
4000
/span>version}</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>transport</artifactId>
<version>${elasticsearch.version}</version>
</dependency>
</dependencies>

(6) 项目需要的一个数据表结构如下(dao层提供一个全表查询接口,此处省略)

CREATE TABLE `employee` (
`id` bigint(15) NOT NULL AUTO_INCREMENT COMMENT 'ID',
`name` varchar(100) DEFAULT NULL COMMENT '名称',
`sharding_id` bigint(15) DEFAULT NULL COMMENT '分区因子',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=1098470660121956355 DEFAULT CHARSET=utf8 COMMENT='mycat数据表';

(7) 创建索引为例介绍微服务实现类代码

@Component
@Service(version = "0.0.1", interfaceClass = TestService.class, registry = "sb", group = "sbTest", timeout = 3000)
public class TestServiceImpl implements TestService {

private static final Logger LOGGER = LoggerFactory.getLogger(TestServiceImpl.class);

@Resource
private TestDao testDao;

private static TransportClient client;

private static Object lock = new Object();

@Value("${elasticSearch.clusterName}")
private String clusterName;

@Value("${elasticSearch.hostList}")
private String hostList;

@Value("${elasticSearch.esIndex}")
private String esIndex;

@Override
public Integer count() {
Integer num = 0;
try {
num = testDao.count();
} catch (Exception e) {
LoggerUtil.error(LOGGER, "统计异常", e.getMessage());
}
return num;
}

@Override
public Integer createEs() {
TransportClient esClient = getClient();
try {
esClient.admin().indices().prepareCreate(esIndex).execute().actionGet();
PutMappingRequest putMapping = Requests.putMappingRequest(esIndex).type(esIndex).source(createMapping(esIndex));
esClient.admin().indices().putMapping(putMapping).actionGet();
} catch (Exception e) {
LoggerUtil.error(LOGGER, "prepareCreate error : {0}", e.getMessage());
}
BulkRequestBuilder bulkRequest = null;
try {
bulkRequest = esClient.prepareBulk();
List<Test> list = testDao.selTest();
for (Test test : list) {
IndexRequest request = esClient.prepareIndex(esIndex, esIndex, test.getId()).setSource(obj2JsonData(test)).request();
bulkRequest.add(request);
}
bulkRequest.execute().actionGet();
} catch (Exception e) {
LoggerUtil.error(LOGGER, "addIndexData error : {0}", e.getMessage());
}
return bulkRequest.numberOfActions();
}

public TransportClient getClient(
20000
) {
try {
if (client == null) {
synchronized (lock) {
String[] hostAndPorts = hostList.split(",");
Settings settings = Settings.builder().put("cluster.name", clusterName).put("client.transport.sniff", true).build();
Map<String, Integer> hostInfos = parseHostAndPorts(hostAndPorts);
client = new PreBuiltTransportClient(settings);
for (Map.Entry<String, Integer> entry : hostInfos.entrySet()) {
client.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(entry.getKey()), entry.getValue()));
}
}
}
} catch (Exception e) {
LoggerUtil.error(LOGGER, "构建服务失败:{0}", e.getMessage());
}
return client;
}

public static Map<String, Integer> parseHostAndPorts(String[] hostAndPorts) {
Map<String, Integer> hostMap = new HashMap<String, Integer>();
if (hostAndPorts == null) {
return hostMap;
}
for (String hostAndPortStr : hostAndPorts) {
String[] hostAndPort = hostAndPortStr.split(":");
if (hostAndPort.length != 2 || !StringUtils.isNumeric(hostAndPort[1])) {
throw new IllegalStateException("端口号信息不正确");
}
hostMap.put(hostAndPort[0], Integer.parseInt(hostAndPort[1]));
}
return hostMap;
}

public static XContentBuilder createMapping(String indexType) {
XContentBuilder mapping = null;
try {
mapping = XContentFactory.jsonBuilder().startObject()
.startObject(indexType).startObject("properties")
.startObject("id").field("type", "text").field("fielddata", "true").endObject()
.startObject("name").field("type", "text").endObject()
.startObject("shardingId").field("type", "text").endObject()
.endObject().endObject().endObject();
} catch (Exception e) {
LoggerUtil.error(LOGGER, "createMapping error : {0}", e.getMessage());
}
return mapping;
}

public static String obj2JsonData(Test test) {
String jsonData = null;
try {
XContentBuilder jsonBuild = XContentFactory.jsonBuilder().startObject()
.field("id", test.getId())
.field("OrgName", test.getName())
.field("shardingId", test.getShardingId())
.endObject();
jsonData = jsonBuild.string();
} catch (Exception e) {
LoggerUtil.error(LOGGER, "obj2JsonData error : {0}", e.getMessage());
}
return jsonData;
}

(8)属性文件application-dev.properties(因为项目做了环境区分)配置如下:

elasticSearch.clusterName=yt-spring-boot-index
elasticSearch.hostList=localhost:9300
elasticSearch.esIndex=employee

(8)启动服务
首先启动zookeeper和elasticsearch5.2.2服务,然后启动微服务,最后启动web服务
使用Restful web service发送请求并创建索引
(9)使用elasticsearch-head-master可视化工具来监控及操作elasticsearch
下载地址:https://github.com/mobz/elasticsearch-head

用自己习惯的浏览器打开index.html,访问索引,如下图。

四:SpringBoot + Spring Data ElasticSearch框架

(1)前面已经提到SpringBoot1.x如果通过SpringDataElasticSearch内部自带的ElasticSearch来集成,版本太低已不能满足项目的需要,本章节将介绍SpringBoot2.x+SpringDataElasticSearch来实现ElasticSearch高版本的全文检索功能。

(2)实体类模块和服务提供者和服务消费者引用配置(注意:由于SpringBoot引用的2.1.3.RELEASE版本,通过源码可以看到其配搭集成的ElasticSearch版本是6.2.2版本,所以需要下载比6.2.2更高版本的ElasticSearch服务版本!!!)
https://www.elastic.co/downloads/past-releases 建议下载elasticsearch 6.6.1版本。

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

(3)实体类增加注解@Document,很重要!(配置索引名称及类型)

@Getter
@Setter
@EqualsAndHashCode
@NoArgsConstructor
@Document(indexName = "wangze", type = "wangze")
public class Test implements Serializable{

private static final long serialVersionUID = -911018888247361501L;

private String id;
private String name;
private String lat;
private String lon;
private String address;
private String shardingId;
}

(4)新建xxxRepository.java接口再继承ElasticsearchRepository类,很重要!(内部集成了CRUD及分页等功能,可以满足绝大部分业务需求功能;另外ElasticSearchTemplate是对ElasticsearchRepository接口的补充,更多的是提供了底层接口的方法,比如千万级别以上的批量索引创建等)

import com.wangze.sb.api.entity.test.Test;
import org.springframework.data.elasticsearch.repository.ElasticsearchRepository;
import org.springframework.stereotype.Component;

@Component
public interface  TestRepository extends ElasticsearchRepository<Test, String> {
// 可以为空,也可以自定义方法,比如findByAlipayCount等等,其中AlipayCount为实体表字段名称驼峰格式。
}

(5)业务代码可以实现新增、批量新增、根据名称查询、查询全部索引、根据名称删除、删除全部索引,根据距离筛选后排序再分页查询等等接口,如下代码:

TestService.java代码如下:

public interface TestService {

void addEs(Test test);

void addEsBulk(List<Test> list);

List<Test> searchEs(String name);

List<Test> findAllEs();

void deleteEs(Test test);

void deleteALl();

List<Test> queryByDistanceAndSort();

}

TestServiceImpl.java代码如下:

@Component
@Service(version = "0.0.1", interfaceClass = TestService.class, registry = "sb", group = "sbTest", timeout = 3000)
public class TestServiceImpl implements TestService {

private static final Logger LOGGER = LoggerFactory.getLogger(TestServiceImpl.class);

private static final String TEST_INDEX_NAME = "wangze";

@Resource
private TestDao testDao;

@Resource
private TestRepository testRepository;

@Autowired
private ElasticsearchTemplate elasticsearchTemplate;

@Override
public void addEs(Test test) {
try {
testRepository.save(test);
} catch (Exception e) {
LoggerUtil.error(LOGGER, "addEs error", e.getMessage());
}
}

@Override
public void addEsBulk(List<Test> tests) {
int counter = 0;
try {
if (!elasticsearchTemplate.indexExists(TEST_INDEX_NAME)) {
elasticsearchTemplate.createIndex(TEST_INDEX_NAME);
}
List<IndexQuery> queries = new ArrayList<>();
for (Test test : tests) {
IndexQuery indexQuery = new IndexQuery();
indexQuery.setId(String.valueOf(test.getId()));
indexQuery.setObject(test);
indexQuery.setIndexName(TEST_INDEX_NAME);
indexQuery.setType(TEST_INDEX_NAME);
queries.add(indexQuery);
if (counter % 500 == 0) {
elasticsearchTemplate.bulkIndex(queries);
queries.clear();
}
counter++;
}
if (queries.size() > 0) {
elasticsearchTemplate.bulkIndex(queries);
}
} catch (Exception e) {
LoggerUtil.error(LOGGER, "addEsBulk error", e.getMessage());
}
}

@Override
public List<Test> searchEs(String name) {
Pageable pageRequest = new PageRequest(0, 50);
Iterable<Test> test = testRepository.search(QueryBuilders.matchQuery("name", name), pageRequest);
return Lists.newArrayList(test);
}

@Override
public List<Test> findAllEs() {
Iterable<Test> test = testRepository.findAll();
return Lists.newArrayList(test);
}

@Override
public void deleteEs(Test test) {
testRepository.delete(test);
}

@Override
public void deleteALl() {
testRepository.deleteAll();
}

@Override
public List<Test> queryByDistanceAndSort() {
List<Test> personList = null;
try {
double lat = 39.929986;
double lon = 116.395645;
Long nowTime = System.currentTimeMillis();
GeoDistanceQueryBuilder filterBuilder = QueryBuilders.geoDistanceQuery("address").point(lat, lon).distance(100000, DistanceUnit.KILOMETERS);
GeoDistanceSortBuilder sortBuilder = SortBuilders.geoDistanceSort("address", new GeoPoint(lat, lon)).unit(DistanceUnit.METERS).order(SortOrder.ASC);
Pageable pageable = new PageRequest(0, 50);
NativeSearchQueryBuilder builder1 = new NativeSearchQueryBuilder().withFilter(filterBuilder).withSort(sortBuilder).withPageable(pageable);
personList = elasticsearchTemplate.queryForList(builder1.build(), Test.class);
LoggerUtil.info(LOGGER, "queryByDistanceAndSort总耗时:" + (System.currentTimeMillis() - nowTime));
} catch (Exception e) {
LoggerUtil.error(LOGGER, "queryByDistanceAndSorterror", e.getMessage());
}
return personList;
}
}

TestController.java代码如下:

@RestController
@RequestMapping("/sb")
public class TestController {

private static final Logger LOGGER = LoggerFactory.getLogger(TestController.class);

@Reference(group = "sbTest", registry = "sb", version = "0.0.1")
private TestService testService;

@RequestMapping(value = "/test")
public @ResponseBody
ResultMessage count(@RequestBody Test test) {
ResultMessage result = new ResultMessage(true, "查询成功");
try {
result.setContent(testService.count());
} catch (Exception e) {
result.change(false, "查询失败" + e.getMessage());
}
return result;
}

@RequestMapping(value = "/addEs")
public ResultMessage addEs() {
ResultMessage result = new ResultMessage(true, "创建索引成功");
try {
Test test = new Test();
Random random = new Random();
test.setId(String.valueOf(random.nextInt()));
test.setName("名字" + random.nextInt());
double max = 0.00001;
double min = 0.000001;
double lat = 38.929986;
double lon = 117.395645;
DecimalFormat df = new DecimalFormat("######0.000000");
double s = random.nextDouble() % (max - min + 1) + max;
String lons = df.format(s + lon);
String lats = df.format(s + lat);
Double dlon = Double.valueOf(lons);
Double dlat = Double.valueOf(lats);
test.setAddress(dlat + "," + dlon);
testService.addEs(test);
} catch (Exception e) {
result.change(false, "创建索引失败" + e.getMessage());
}
return result;
}

@RequestMapping(value = "/addEsBulk")
public ResultMessage addEsBulk() {
ResultMessage result = new ResultMessage(true, "创建索引成功");
try {
double lat = 38.929986;
double lon = 117.395645;
List<Test> testList = new ArrayList<>(10000);
for (int i = 0; i < 10000; i++) {
double max = 0.00001;
double min = 0.000001;
Random random = new Random();
double s = random.nextDouble() % (max - min + 1) + max;
DecimalFormat df = new DecimalFormat("######0.000000");
String lons = df.format(s + lon);
String lats = df.format(s + lat);
Double dlon = Double.valueOf(lons);
Double dlat = Double.valueOf(lats);

Test test = new Test();
test.setId(String.valueOf(i));
test.setName("名字" + i);
test.setAddress(dlat + "," + dlon);
testList.add(test);
}
testService.addEsBulk(testList);
} catch (Exception e) {
result.change(false, "创建索引失败" + e.getMessage());
}
return result;
}

@RequestMapping(value = "/searchEsByName")
public ResultMessage searchEs(@RequestBody Test test) {
ResultMessage result = new ResultMessage(true, "根据name检索索引成功");
try {
List<Test> tests = testService.searchEs(test.getName());
result.setContent(JSON.toJSONString(tests));
} catch (Exception e) {
result.change(false, "根据name检索索引失败" + e.getMessage());
}
return result;
}

@RequestMapping(value = "/findAllEs")
public ResultMessage findAll(@RequestBody Test test) {
ResultMessage result = new ResultMessage(true, "检索全部索引查询成功");
try {
List<Test> all = testService.findAllEs();
result.setContent(JSON.toJSONString(all));
} catch (Exception e) {
result.change(false, "检索全部索引查询失败" + e.getMessage());
}
return result;
}

@RequestMapping(value = "/deleteEsById")
public ResultMessage deleteEs(@RequestBody Test test) {
ResultMessage result = new ResultMessage(true, "创建索引成功");
try {
testService.deleteEs(test);
} catch (Exception e) {
result.change(false, "创建索引失败" + e.getMessage());
}
return result;
}

@RequestMapping(value = "/deleteAll")
public ResultMessage deleteAll() {
ResultMessage result = new ResultMessage(true, "创建索引成功");
try {
testService.deleteALl();
} catch (Exception e) {
result.change(false, "创建索引失败" + e.getMessage());
}
return result;
}

@RequestMapping(value = "/queryByDistanceAndSort")
public ResultMessage queryByDistanceAndSort() {
ResultMessage result = new ResultMessage(true, "创建索引成功");
try {
List<Test> all = testService.queryByDistanceAndSort();
result.setContent(JSON.toJSONString(all));
} catch (Exception e) {
result.change(false, "创建索引失败" + e.getMessage());
}
return result;
}

}

(6)启动ElasticSearch6.6.1和zookeeper服务,然后启动微服务,最后启动web服务,使用idea工具RestFul Web Service测试,如下图:
第一张图是创建索引结果:

第二张图和第三张图是根据名称搜索索引结果:


第四张图是 搜索100公里范围内 + 按照距离排序最近的 + 分页前50条 查询结果:

(7)此方案搭建框架踩过的坑及总结

第一种错误:

Description:

The bean 'testRepository', defined in null, could not be registered. A bean with that name has already been defined in null and overriding is disabled.

Action:

Consider renaming one of the beans or enabling overriding by setting spring.main.allow-bean-definition-overriding=true

如果出现上面错误,则需要在application-dev.properties和application.yml中分别配置如下(是否允许重复bean定义注册)
application-dev.properties中如下:

spring.main.allow-bean-definition-overriding=true

application.yml中如下:

spring:
main:
allow-bean-definition-overriding: true

第二种错误:

nested exception is java.lang.IllegalStateException: availableProcessors is already set to [4], rejecting [4]

如果出现上面错误,说明是netty冲突的问题
则需要在微微服务或web启动类中(注意是新增在第一行)增加如下配置

System.setProperty("es.set.netty.runtime.available.processors","false");

第三种错误:

Loading class `com.mysql.jdbc.Driver'. This is deprecated. The new driver class is `com.mysql.cj.jdbc.Driver'. The driver is automatically registered via the SPI and manual loading of the driver class is generally unnecessary.

如果出现上面错误,说明是必须要升级mysql-connector-java版本,建议升级版本到8.0.12版本,如下代码:
pom.xml中

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.12</version>
</dependency>

application-dev.properties中两个地方需要变化:

spring.datasource.url=jdbc:mysql://127.0.0.1:3306/pms?serverTimezone=UTC&useSSL=false
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

第四种错误:如果涉及到经纬度按距离排序时,实体类表示距离的字段,必须增加注解和类型标识,如下代码:

@GeoPointField
private GeoPoint location;
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: 
相关文章推荐