您的位置:首页 > 数据库 > MySQL

logstash-input-jdbc实现mysql 与elasticsearch实时同步

2018-10-16 15:37 453 查看

实现MySQL数据库中数据到Elasticsearch的实时同步:

首先需要做好的准备工作:

1、服务器上安装好elasticsearch和logstash

2、安装logstash-input-jdbc插件,但从logstash5.X开始,已经至少集成了logstash-input-jdbc插件。所以,你如果使用的是logstash5.X,可以不必再安装,可以直接跳过这一步。

插件安装可参考:http://blog.csdn.net/yeyuma/article/details/50240595#quote 

注:在以上配置镜像的过程中taobao的镜像地址已经改为如下地址:http://gems.ruby-china.com/ added to sources

 

如何同步?

1、建立数据库和表

数据库:test

表:item

CREATE TABLE `item` (
  `item_id` int(11) NOT NULL,
  `id` int(11) NOT NULL,
  `name` varchar(255) DEFAULT NULL,
  `price` decimal(10,2) DEFAULT NULL,
  `create_time` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (`item_id`),
  KEY `user` (`id`),
  CONSTRAINT `user` FOREIGN KEY (`id`) REFERENCES `user` (`id`) ON DELETE CASCADE ON UPDATE CASCADE
) ENGINE=InnoDB DEFAULT CHARSET=latin1;

2、准备一个es-mysql.conf文件

配置如下:

input {
    stdin {
    }
    jdbc {
      jdbc_connection_string => "jdbc:mysql://node01:3306/test"
      jdbc_user => "root"
      jdbc_password => "root"
      jdbc_driver_library => "/home/mysql-connector-java-5.1.28-bin.jar"
      jdbc_driver_class => "com.mysql.jdbc.Driver"
      jdbc_paging_enabled => "true"
      jdbc_page_size => "50000"

      #下面两个参数是增量导入和更新的关键

      #use_column_value设置为true时,使用tracking_column定义的值作为sql_last_value,默认为false
      #use_column_value => true

      # sql_last_value用于计算要查询的行的值。在运行任何查询之前,将其设置为1970年1月1日星期四,如果  use_column_value为true,则tracking_column则设置为0。在后续查询运行后,它会相应更新。
      #tracking_column => "create_time"

      #两种sql查询方法,一种直接书写在conf中,另一种从配置文件中读取
      #statement_filepath => "/bigdata/logstash-2.3.1/conf/es-jdbc/es-mysql.sql"
      statement => "select * from item where create_time>=:sql_last_value"

      #定时字段 各字段含义(由左至右)分、时、天、月、年,全部为*默认含义为每分钟都更新
      schedule => "* * * * *"
      type => "jdbc"
    }
}

filter {
    json {
        source => "message"
        remove_field => ["message"]
    }
}

output {
    elasticsearch {

        #ES的地址与端口
        hosts => ["node01:9200", "node02:9200", "node03:9200"]

    # ES的索引,自己定义
    index => "mysql-es"

    # ES的唯一id标识
        document_id => "%{item_id}"
    codec => plain {
        charset => "UTF-16BE"
      }
    }
    stdout{
        codec => json_lines
    }
}

 

执行配置文件:

[root@node01 logstash-2.3.1]# ./bin/logstash -f ./conf/es-jdbc/es-mysql.conf 

出现如下,表示运行成功:

阅读更多
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: