您的位置:首页 > 数据库 > MySQL

ElasticSearch5.4.3使用logstash的logstash-input-jdbc实现mysql数据同步

2017-08-24 16:48 1316 查看

1.安装软件

安装elasticsearch、logstash、mysql这里就不多说了

2.安装logstash-input-jdbc

进入logstash的bin目录

cd /home/lyt/AppRepo/elasticsearch/logstash-5.4.3/bin

执行插件安装

./logstash-plugin install logstash-input-jdbc


3.编写同步脚本

在logstash的目录创建logstash-mysql目录

在logstash-mysql目录新建logstash-mysql.conf脚本和同步的init-ila.sql文件。

logstash-mysql.conf的内容如下:

input {
jdbc {
jdbc_driver_library => "/home/lyt/AppRepo/elasticsearch/logstash-5.4.3/logstash-mysql/mysql-connector-java-5.1.43.jar"
jdbc_driver_class => "com.mysql.jdbc.Driver"
jdbc_connection_string => "jdbc:mysql://127.0.0.1:3306/las?characterEncoding=UTF-8&useSSL=false"
jdbc_user => "lyt"
jdbc_password => "000000"
statement_filepath => "/home/lyt/AppRepo/elasticsearch/logstash-5.4.3/logstash-mysql/init-ila.sql"
tracking_column => "update_date"
jdbc_paging_enabled => "true"
jdbc_page_size => "50000"
schedule => "* * * * *"
type => "knowledge"
jdbc_default_timezone =>"Asia/Shanghai"
}
}

filter {
json {
source => "message"
remove_field => ["message"]
}
}

output {
stdout {
codec => json_lines
}
elasticsearch {
hosts => "127.0.0.1:9200"
index => "ila_knowledge"
document_id => "%{id}"
doc_as_upsert => true
"document_type" => "%{type}"
action => "update"
}
}

init-ila.sql文件内容如下:

select * from las_knowledge where update_date > :sql_last_value

整个目录结构:

[lyt@localhost logstash-mysql]$ tree
.
├── init-ila.sql
├── logstash-mysql.conf
└── mysql-connector-java-5.1.43.jar


4.启动脚本同步

执行同步前务必要自己先创建好index和type,并设置好分词器(中文ik分词器),如果不自己创建的话,elasticsearch会自动给你创建,但是一个field的类型和分词器并不好,导致查询不准确!!!

./bin/logstash -f logstash-mysql/logstash-mysql.conf


5.总结

logstash-input-jdbc能较好的实现mysql的insert、update的操作的增量、全量数据同步更新到ES。
但delete操作的实时同步没有很好的解决方案,我暂时使用程序操作elasticsearch删除的方式进行。

程序使用java:

/**
* 删除
*
* @throws Exception
*/
@Test
public void deleteEmployee() throws Exception {
DeleteResponse response = client.prepareDelete("las_knowledge", "knowledge", "09b4520a-99f1-4f0e-82df-885ef53791a6").get();
String id = response.getId();
System.out.print(id);
}


进行数据同步前最好手动创建index和type,并设置好IK分词器。

建议设置elasticsearch不自动创建索引!通过在
config/elasticsearch.yml
的每个节点下添加下面的配置:

action.auto_create_index: false
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息