Logstash的logstash-input-jdbc插件mysql数据同步ElasticSearch及词库
2017-02-24 10:00
1006 查看
Logstash是一款轻量级的日志搜集处理框架,可以方便的把分散的、多样化的日志搜集起来,并进行自定义的处理,然后传输到指定的位置,比如某个服务器或者文件。
http://kibana.logstash.es/content/logstash/
风来了.fox
中文分词插件如下
http://blog.csdn.net/fenglailea/article/details/55506775#t17
http://blog.csdn.net/fenglailea/article/details/56282414
5.2 MYSQL 插件 logstash-input-jdbc插件
介绍
Mysql数据库:
表:product
IP:192.168.1.254
端口默认
ElasticSearch
IP:10.1.5.101
安装位置:/home/hadoop/elasticsearch-5.2.1
端口默认
Logstash:
IP:10.1.5.101
安装位置:/home/hadoop/logstash-5.2.1
端口默认
mysql 的Java 驱动包:
mysql-connector-java-5.1.40-bin.jar
安装位置:/home/hadoop/logstash-5.2.1/mysql-connector-java-5.1.40-bin.jar
索引和映射可以分开设置也可以同时设置。
这里使用同时设置
创建一个名叫 product 的索引,分词器用 ik_max_word,并创建一个 jdbc 的类型,里面有一个 title 的字段,指定其使用 ik_max_word 分词器
jdbc:类型
settings注释掉部分,表示可以自定义分析器,然后由 mappings中的analyzer 调用
https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
进入 logstash根目录
加入如下:
说明
index:索引名称
hosts:ES的IP地址及端口
document_id:自增ID
json_lines:JSON格式输出
接着创建 product.sql
加入
Logstash 修改时区 取巧方法,利用mysql数据库内置函数
date_add(:sql_last_value,interval 8 hour) Logstash使用的标准时间并没有使用的是本地时区时间,这里取个巧,在他的时间基础上增加8小时,不用修改 Logstash 的时区了
说明:
sql_last_value是每次读取数据库表时更新此值,表示最后更新时间
如果input里面use_column_value 设置为true,可以是我们设定的字段的上一次的值。
默认 use_column_value 为 false,这样 :sql_last_value为上一次更新的最后时间,
这样对于新增的值,才会更新,这样就实现了增量更新的目的。
具体请看
https://www.elastic.co/guide/en/logstash/5.2/plugins-inputs-jdbc.html#_state
http://kibana.logstash.es/content/logstash/
风来了.fox
elasticsearch 要把词库设置好
词库就保密了哈哈中文分词插件如下
http://blog.csdn.net/fenglailea/article/details/55506775#t17
logstash安装
http://blog.csdn.net/fenglailea/article/details/56282414logstash-input-jdbc插件 安装
请看 下面链接的http://blog.csdn.net/fenglailea/article/details/56282414
5.2 MYSQL 插件 logstash-input-jdbc插件
案例
当前用户 hadoop介绍
Mysql数据库:
表:product
IP:192.168.1.254
端口默认
ElasticSearch
IP:10.1.5.101
安装位置:/home/hadoop/elasticsearch-5.2.1
端口默认
Logstash:
IP:10.1.5.101
安装位置:/home/hadoop/logstash-5.2.1
端口默认
mysql 的Java 驱动包:
mysql-connector-java-5.1.40-bin.jar
安装位置:/home/hadoop/logstash-5.2.1/mysql-connector-java-5.1.40-bin.jar
删除索引和映射
如果已存在某索引,要重建该索引,那么就要删除他curl -XDELETE 'http://10.1.5.101:9200/product'
创建索引和映射(这一步最重要)
先建立索引,然后定义映射,数据最后添加索引和映射可以分开设置也可以同时设置。
这里使用同时设置
curl -XPUT "http://10.1.5.101:9200/product" -d '{ // "settings" : { // "analysis" : { // "analyzer" : { // "ik-自定义" : { // "tokenizer" : "ik_max_word" // } // } // } // }, "mappings" : { "_default_":{ "_all": { "enabled": false } // 关闭_all字段,因为我们只搜索title字段 }, "jdbc" : { "dynamic" : true,// 是否启用“动态修改索引” "properties" : { "title" : { "type" : "string", "analyzer" : "ik_max_word" } } } } }'
创建一个名叫 product 的索引,分词器用 ik_max_word,并创建一个 jdbc 的类型,里面有一个 title 的字段,指定其使用 ik_max_word 分词器
jdbc:类型
settings注释掉部分,表示可以自定义分析器,然后由 mappings中的analyzer 调用
ik-自定义这个分析器
创建配置
官方文档地址https://www.elastic.co/guide/en/logstash/current/plugins-inputs-jdbc.html
进入 logstash根目录
mkdir -p etc cd etc vim product.conf
加入如下:
input { stdin { } jdbc { # mysql 数据库链接,shop为数据库名 jdbc_connection_string => "jdbc:mysql://192.168.1.254:3306/shop" # 用户名和密码 jdbc_user => "root" jdbc_password => "root" # 驱动 jdbc_driver_library => "/home/hadoop/logstash-5.2.1/mysql-connector-java-5.1.40-bin.jar" # 驱动类名 jdbc_driver_class => "com.mysql.jdbc.Driver" jdbc_paging_enabled => "true" jdbc_page_size => "50000" # 执行的sql 文件路径+名称 statement_filepath => "/home/hadoop/logstash-5.2.1/etc/product.sql" # 设置监听间隔 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新 schedule => "* * * * *" # 索引类型 type => "jdbc" } } filter { json { source => "message" remove_field => ["message"] } } output { elasticsearch { hosts => ["10.1.5.101:9200"] index => "product" document_id => "%{id}" } stdout { codec => json_lines } }
说明
index:索引名称
hosts:ES的IP地址及端口
document_id:自增ID
json_lines:JSON格式输出
接着创建 product.sql
vim product.sql
加入
SELECT product_id AS id , title , market_price , shop_price , content , add_time , update_time FROM zh_product WHERE is_del = 0 AND is_open = 1 AND update_time >=date_add(:sql_last_value,interval 8 hour)
Logstash 修改时区 取巧方法,利用mysql数据库内置函数
date_add(:sql_last_value,interval 8 hour) Logstash使用的标准时间并没有使用的是本地时区时间,这里取个巧,在他的时间基础上增加8小时,不用修改 Logstash 的时区了
说明:
sql_last_value是每次读取数据库表时更新此值,表示最后更新时间
如果input里面use_column_value 设置为true,可以是我们设定的字段的上一次的值。
默认 use_column_value 为 false,这样 :sql_last_value为上一次更新的最后时间,
这样对于新增的值,才会更新,这样就实现了增量更新的目的。
具体请看
https://www.elastic.co/guide/en/logstash/5.2/plugins-inputs-jdbc.html#_state
启动
bin/logstash -f etc/product.conf
查看
浏览器中打开http://10.1.5.101:9200/product/_search 或 http://10.1.5.101:9200/product/jdbc/_search[/code]分词搜索
curl -XPOST "http://10.1.5.101:9200/product/jdbc/_search?pretty" -d' { "query" : { "match" : { "title" : "德国Aptamil爱他美婴幼儿配方奶粉2+段(适合2岁以上宝宝)600g" }}, "from" : 0, "size" : 100, "highlight" : { "pre_tags" : ["<font color='red'>"], "post_tags" : ["</font>"], "fields" : { "title" : {} } } }'
说明:
pretty:加入pretty时,返回的JSON将格式化,方便阅读调试
from定义了目标数据的偏移值
size定义当前返回的事件数目
上面两个综合起来使用就是分页
默认from为0,size为100,即所有的查询默认仅仅返回前100条数据,从第0条开始计数,直到返回100条数据
结果哈哈,保密
这里用了高亮属性 highlight,直接显示到 html 中,被匹配到的字或词将以红色突出显示。若要用过滤搜索,直接将 match 改为 term 即可
参考:
http://blog.csdn.net/laoyang360/article/details/51747266
http://keenwon.com/1404.html
http://blog.csdn.net/jam00/article/details/52983056
http://www.cnblogs.com/xing901022/p/5235993.html
http://mt.sohu.com/20170102/n477576527.shtml
http://www.cnblogs.com/phpshen/p/6098333.html
http://zhaoyanblog.com/archives/495.html
https://my.oschina.net/xiaohui249/blog/232784
相关文章推荐
- 使用logstash-6.2.2和logstash-input-jdbc插件实现mysql数据同步到Elasticsearch
- ElasticSearch5+logstash的logstash-input-jdbc实现mysql数据同步
- ElasticSearch5+logstash的logstash-input-jdbc实现mysql数据同步
- ElasticSearch5+logstash的logstash-input-jdbc实现mysql数据同步
- ElasticSearch5+logstash的logstash-input-jdbc实现mysql数据同步
- ElasticSearch5.4.3使用logstash的logstash-input-jdbc实现mysql数据同步
- Logstash使用jdbc_input同步Mysql数据时遇到的空时间SQLException问题
- Elasticsearch同步mysql(logstash-input-jdbc)和一些查询问题
- logstash-input-jdbc实现ElasticSearch与mysql同步
- logstash-input-jdbc实现mysql 与elasticsearch实时同步深入详解
- logstash-input-jdbc实现mysql 与elasticsearch实时同步
- logstash-input-jdbc 实时同步mysql数据
- logstash-input-jdbc实现mysql 与elasticsearch实时同步深入详解
- logstash-input-jdbc实现mysql 与elasticsearch实时同步深入详解
- logstash-input-jdbc实现mysql 与elasticsearch实时同步深入详解
- logstash-input-jdbc实现mysql 与elasticsearch实时同步深入详解
- Logstash使用jdbc同步MySQL中的数据
- 利用Logstash的logstash-input-jdbc插件实现mysql增量导入ES
- elasticSearch数据导入工具logstash-input-jdbc 同步原理及相关问题解读
- Elasticsearch和mysql数据同步(elasticsearch-jdbc)