您的位置:首页 > 数据库 > Oracle

ElasticSearch作为搜索引擎-Oracle数据同步

2017-11-01 15:42 190 查看
ElasticSearch作为搜索引擎,我们需要解决2大问题:
1, 如何将被搜索的数据在ES上创建反向索引
2, Java代码如何与ES交互
其中第一个大问题又分为两个小问题
1.1,如何初始化已有的数据
1.2,如何同步增量数据
第二个大问题也有两种集成方式
2.1 Spring Data 9300端口集成
2.2 Restful API 9200端口集成
 
本篇先解决第一大问题。
 
利用Logstash的收集转化功能来做数据的同步,所以要先安装好Logstash和ElasticSearch。
参考《ElasticSearch单节点安装》《ELK-ElasticSearch+Logstash+Kibana
与关系型数据库的同步,需要安装logstash-input-jdbc插件

进入到logstash的根目录,进入bin,执行以下命令
./logstash-plugininstall logstash-input-jdbc



作为Java开发人员找个ojdbc6.jar不算是难事吧,丢一个到logstash目录里,随便放,我放在了/home/docker/logstash/logstash-5.6.3下。
 
Logstash配置:
核心文件有2个,一个是input-filter-output格式的运行文件logstash.conf,一个是需要做索引的脚本文件jdbc_oracle.sql。
 
logstash.conf配置如下:

input{
stdin{

}
jdbc{
jdbc_connection_string => "jdbc:oracle:thin:@//192.168.120.156:1521/irm"
jdbc_user => "irmstagejx2"
jdbc_password => "******"
jdbc_driver_library => "/home/docker/logstash/logstash-5.6.3/ojdbc6.jar"
jdbc_driver_class => "Java::oracle.jdbc.driver.OracleDriver"
record_last_run => "true"
use_column_value => "false"
tracking_column => "id"
last_run_metadata_path => "/home/docker/logstash/logstash-5.6.3/info"
clean_run => "false"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
statement_filepath =>"/home/docker/logstash/logstash-5.6.3/jdbc_oracle.sql"
schedule => "* * * * *"
type => "tstype"
}
}

output{
elasticsearch{
hosts => "192.168.226.133:9200"
index => "index_entity"
document_id => "%{id}"
}
}

配置具体详解:
Jdbc开头的这些都比较好理解,注意ojdbc6.jar的目录要跟你自己存放该包的目录一致。
record_last_run:效果不明
use_column_value:配合jdbc_oracle.sql里的时间戳,此项配置成true,时间戳使用上次执行计划中最后的值,此项配置成false,时间戳使用上次执行的时间点。
tracking_column:效果不明。

last_run_metadata_path:配合use_column_value,上次执行的中间数据总要有地方来存储。
clean_run:每次执行是增量同步还是全量同步
statement_filepath:执行计划需要同步的脚本位置
schedule:执行计划时间策略,分、时、天、月、年,最小间隔是1分钟。上面配置的就是1分钟。
type:索引中的type
document_id:ES索引中id对应的脚本中属性
 
jdbc_oracle.sql脚本:
select id,name from mm_entitytype where updatedate>:sql_last_value

比较好理解,:sql_last_value是插件维护的最后执行时间,会存储到last_run_metadata_path中。
启动logstash:
bin/logstash-f logstash.conf
可以看到logstash.conf的配置已经生效:



由于我的数据都是老数据,我修改其中一条updatedate使其满足脚本中的判断条件



去ElasticSearch中搜索下,注意这里的index、type、source、id中的内容,都对应着我们前面的2个配置文件。



再看下last_run_metadata_path中是不是跟我们想象中的一样维护了组后一次执行时间。



OK,目前为止索引新增已经验证完毕,我们来修改下这条数据,看看es会不会更新
数据库中将TC修改成TC-ADD,1分钟后来ES中查看,发生了变化,证明修改也是可以的。



删除不尝试了,后面有详细说明和解决方案。
 
既然info中维护的是时间戳信息,那我尝试直接修改info想改个最小时间做一次全量增改,测试结果失败了,我猜应该info只是个备份的时间,真正生效的维护在内存中。于是先关闭logstash,修改info里的时间到一个很小的时间保证jdbc_oracle.sql中可以刷新到所有的数据,再重启logstash,果然这次生效了,es为我的这个表做了次全量的初始化更新。
 
那么问题来了,为什么delete不行?
logstash-input-jdbc不支持物理删除,我猜跟它的机制有关,假设记录在表中被物理删除,id随着记录一起被干掉了,等到logstash监听周期到了的时候根本没办法把这个已删除的id送给ES中的index了。为弥补这一缺陷,需要将物理删除改为逻辑删除,通过状态位来维护它是否存活,同时将这个状态信息同步给ElasticSearch,并在对索引进行搜索时将“死掉”的文档过滤掉。当然为了节约存储空间和减少搜索范围,最好定时将数据库中和ES中逻辑删除的部分通过手工或定时任务物理清理掉。
 
 
回头看我们开头分析的2大问题,第一大问题数据如何初始化和同步,已经解决了。下一篇博文我们来解决如何被代码调用。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
相关文章推荐