使用Logstash将MySql数据迁移到Elasticsearch中
2017-03-18 00:00
816 查看
摘要: 使用Logstash将MySql数据迁移到Elasticsearch中
在许多情况下,我们希望使用不是由Elasticsearch本身支持的不同数据库的输入。在本文中,我们将展示如何通过Logstash将数据从MySql数据库迁移到Elasticsearch。
以下框图说明了JDBC连接器插件在从JDBC支持的数据库迁移数据时的作用:
在图中,我们有logstash运行配置文件,触发我们设置的预定义查询,以便将我们感兴趣的数据收集到顺序数据库。一旦查询被触发到JDBC插件,它将它传递到数据库并收集数据,它将移交给Logstash。
根据我们的要求,我们可以处理数据并以所需的形式进行处理,在处理后将处理的数据索引到Elasticsearch。我们将在后面的章节中的示例中显示详细的应用程序。
数据库现在已创建,我们只是确保我们使用相同的数据库用于我们的目的:
使用以下模式创建
现在在上表中插入一些测试数据:
我们创建了一个包含3名员工详细信息的表。您可以通过传递查询来显示表的详细信息以显示其所有内容:
结果表将如下所示:
在上面的配置文件中,我们提到了许多参数,例如:JDBC连接器检查数据的数据库,JDBC插件的位置,MySql访问的用户名和密码,以及查询语句。将上述设置应用于
如JDBC部分中的框图中所述,logstash配置文件将查询传递给JDBC驱动程序以及用户凭据。它还获取数据并将数据提供给Logstash。Logstash将使其JSON格式化并索引到Elasticsearch数据库。查询索引
上述查询将每行作为单独的文档列出,列为字段。一个例子:
另外,更新同一表格中一行的现有值,如下所示:
我们如何解决这种重复?我们必须为每个文档提供一个唯一的ID。对于每次运行,每个文档的ID应该相同,以防止重复问题。这可以通过编辑conf文件的输出部分来实现,如下所示:
为了实现此要求,请使用该
检查所获取的数据
对于增量更新,请修改查询以
用于情况1,2和3的完整配置文件如下:
为了看到上面的配置工作,添加几个字段到现有的MySql表,其
在许多情况下,我们希望使用不是由Elasticsearch本身支持的不同数据库的输入。在本文中,我们将展示如何通过Logstash将数据从MySql数据库迁移到Elasticsearch。
JBDC插件
Logstash可用的JDBC插件确保来自任何具有JDBC接口的数据库的数据都可以作为输入存入Logstash。这个插件还支持需要调度运行logstash。它还通过使用查询使选择性数据作为输入。在这些类型的数据库中,我们有行和列的概念。每个行都被视为单个事件,每个行(事件)中的列都被视为事件中的字段。以下框图说明了JDBC连接器插件在从JDBC支持的数据库迁移数据时的作用:
在图中,我们有logstash运行配置文件,触发我们设置的预定义查询,以便将我们感兴趣的数据收集到顺序数据库。一旦查询被触发到JDBC插件,它将它传递到数据库并收集数据,它将移交给Logstash。
根据我们的要求,我们可以处理数据并以所需的形式进行处理,在处理后将处理的数据索引到Elasticsearch。我们将在后面的章节中的示例中显示详细的应用程序。
将MySql数据插入Elasticsearch
让我们继续在Logstash的帮助下将数据从顺序数据库(如MySql)迁移到Elasticsearch。我们需要相应的MySql的JDBC驱动程序。您可以 在这里下载。 现在让我们"testdb"使用以下命令创建一个在MySql 中命名的数据库:
创建 testdb
数据库现在已创建,我们只是确保我们使用相同的数据库用于我们的目的:
显示 数据库 ; 使用 testdb;
使用以下模式创建
"testtable"在数据库下命名的表
"testdb":
创建 表 testtable(PersonID int,LastName varchar(255),FirstName varchar(255),City varchar(255),Date datetime(6));
现在在上表中插入一些测试数据:
INSERT INTO testtable(PersonID,LastName,FirstName,City,Date) VALUES('4005','Kallis','Jaques','Cape Town','2016-05-23 16:12:03.568810'); INSERT INTO testtable(PersonID,LastName,FirstName,City,Date) VALUES('4004','Baron','Richard','Cape Town','2016-05-23 16:12:04.370460'); INSERT INTO testtable(PersonID,LastName,FirstName,City,
我们创建了一个包含3名员工详细信息的表。您可以通过传递查询来显示表的详细信息以显示其所有内容:
select * from testtable
结果表将如下所示:
Logstash配置
现在我们已经创建了一个内容如上所示的MySql表,看看如何配置Logstash。在logstash文件夹中,我们有一个logstash.conf文件,它是要配置和运行以获取必要的结果的文件。初始配置如以下屏幕截图所示:
在上面的配置文件中,我们提到了许多参数,例如:JDBC连接器检查数据的数据库,JDBC插件的位置,MySql访问的用户名和密码,以及查询语句。将上述设置应用于
"logstash.conf"文件后,通过键入以下命令运行Logstash:
bin / logstash -f logstash .conf
如JDBC部分中的框图中所述,logstash配置文件将查询传递给JDBC驱动程序以及用户凭据。它还获取数据并将数据提供给Logstash。Logstash将使其JSON格式化并索引到Elasticsearch数据库。查询索引
"test-migrate"如下:
curl -XPOST'http :// localhost:9200 / test-migrate / _search?pretty = true' -d '{}'
上述查询将每行作为单独的文档列出,列为字段。一个例子:
{ “_index” : “测试迁移”, “_type” : “数据”, “_id” :“4004” , “_score” :1, “_source”:{ “PERSONID” :4004, “姓氏”:“男爵“, ”名字“:”理查“, ”城市“:”开普敦“, ”日期“:”2016-05-23T10:42:04.370Z“ , “@version”:“1”, “@timestamp”:“2016-07-10T10:36:43.685Z” }} }}
更多配置
在本节中,我们将展示各种用例场景。向上面的MySql中添加另一行数据,如下所示:INSERT INTO testtable(PersonID,LastName,FirstName,City,Date) VALUES('4002','Cheers','Joan','Cape Town','2016-05-23 16:12:07.163681');
另外,更新同一表格中一行的现有值,如下所示:
UPDATE测试表 - > SET FirstName = 'James' - > WHERE PersonID = 4005 ;
1.重复问题
完成上述步骤后,再次运行logstash配置文件。我们期望总共4个文档包括新的行和更新的行。但是,当再次检查索引时,不是这样的。相反,我们共有7个文件。这是因为初始文档在elasticsearch数据库中保持不变,这是由于没有从表本身提供特定的id。当我们运行logstash配置时,整个内容"testtable"都会被索引一次。
我们如何解决这种重复?我们必须为每个文档提供一个唯一的ID。对于每次运行,每个文档的ID应该相同,以防止重复问题。这可以通过编辑conf文件的输出部分来实现,如下所示:
2.变换操作
我们遇到的其他重要需求之一将是更改字段名称和值,当我们索引到elasticsearch。我们在我们当前的例子中添加一个要求来演示这个。对于所有与“开普敦”匹配的文档,该字段"City"应替换为值
"South Africa",字段值应替换为
"Country"。
为了实现此要求,请使用该
"filter"属性在elasticsearch中操作已提取的数据。使用
"mutate"里面的属性
"filter"执行所需的更改。使用上面的
"input"和
"output"部分的设置,我们需要
"filter"在
logstash.conf文件中添加以下部分:
filter { if [city] == “开普敦” { mutate { rename => { “city” => “country” } replace => [ “country”,“South Africa” ] }} }} }}
检查所获取的数据
"Cape Town"的每个事件的
"City"列的值。如果找到匹配项,
"City"则重命名该字段,
"country"并将每个匹配的值替换为
"South Africa"。
3.计划和增量更新
如果数据在MySql数据库中不断更新,我们需要递增和定期对其进行索引,该怎么办?要定期获取数据,请"scheduler"在输入部分中添加属性。在
"scheduler,"给定的值时,让每隔一段时间运行conf文件。它是高度可定制的,并使用Rufus调度程序语法。
对于增量更新,请修改查询以
"sql_last_value"针对字段使用。这里我们给这个字段
"Date"。我们还设置
"use_column_value"为true,并将对应的列链接到
"Date"使用
"tracking_column"。
用于情况1,2和3的完整配置文件如下:
为了看到上面的配置工作,添加几个字段到现有的MySql表,其
"Date"值比之前存在的值更新。现在运行logstash,您可以看到只有新数据已在Elasticsearch索引中建立索引。
结论
在本文中,我们讨论了用于使用logstash将数据从连续数据库迁移到Elasticsearch的JDBC插件。我们还熟悉如何处理常见问题,如重复,字段和值的突变,以及调度和增量更新。问题/意见?给我们一行下面。相关文章推荐
- logstash使用template提前设置好maping同步mysql数据到Elasticsearch5.5.2
- 使用logstash-6.2.2和logstash-input-jdbc插件实现mysql数据同步到Elasticsearch
- ElasticSearch5.4.3使用logstash的logstash-input-jdbc实现mysql数据同步
- 使用 Spring、Elasticsearch 及 Logstash 构建企业级数据搜索和分析平台
- 使用 Spring、Elasticsearch 及 Logstash 构建企业级数据搜索和分析平台
- ElasticSearch5+logstash的logstash-input-jdbc实现mysql数据同步
- 利用Logstash插件进行Elasticsearch与Mysql的数据
- 【elasticsearch】使用工具迁移索引数据
- ElasticSearch5+logstash的logstash-input-jdbc实现mysql数据同步
- 使用logstash将kafka数据入到elasticsearch
- Logstash的logstash-input-jdbc插件mysql数据同步ElasticSearch及词库
- DB2数据迁移到mysql,使用insert语句
- 使用SQLyog将sqlserver的数据及表结构迁移到mysql中
- elasticsearch 重建索引 使用python迁移索引数据 reindex
- 使用 Docker 部署和迁移多节点的 ElasticSearch-Logstash-Kibana 集群
- ElasticSearch5+logstash的logstash-input-jdbc实现mysql数据同步
- Elasticsearch系列(九)----使用Logstash-input-jdbc同步数据库中的数据到ES
- HBase——使用Put迁移MySql数据到Hbase
- elasticsearch5.2.1使用logstash同步mysql
- Logstash使用jdbc_input同步Mysql数据时遇到的空时间SQLException问题