您的位置:首页 > 数据库 > MySQL

使用Logstash将MySql数据迁移到Elasticsearch中

2017-03-18 00:00 816 查看
摘要: 使用Logstash将MySql数据迁移到Elasticsearch中

在许多情况下,我们希望使用不是由Elasticsearch本身支持的不同数据库的输入。在本文中,我们将展示如何通过Logstash将数据从MySql数据库迁移到Elasticsearch。

JBDC插件

Logstash可用的JDBC插件确保来自任何具有JDBC接口的数据库的数据都可以作为输入存入Logstash。这个插件还支持需要调度运行logstash。它还通过使用查询使选择性数据作为输入。在这些类型的数据库中,我们有行和列的概念。每个行都被视为单个事件,每个行(事件)中的列都被视为事件中的字段。

以下框图说明了JDBC连接器插件在从JDBC支持的数据库迁移数据时的作用:



在图中,我们有logstash运行配置文件,触发我们设置的预定义查询,以便将我们感兴趣的数据收集到顺序数据库。一旦查询被触发到JDBC插件,它将它传递到数据库并收集数据,它将移交给Logstash。

根据我们的要求,我们可以处理数据并以所需的形式进行处理,在处理后将处理的数据索引到Elasticsearch。我们将在后面的章节中的示例中显示详细的应用程序。

将MySql数据插入Elasticsearch

让我们继续在Logstash的帮助下将数据从顺序数据库(如MySql)迁移到Elasticsearch。我们需要相应的MySql的JDBC驱动程序。您可以 在这里下载 现在让我们
"testdb"
使用以下命令创建一个在MySql 中命名的数据库:

创建 testdb

数据库现在已创建,我们只是确保我们使用相同的数据库用于我们的目的:

显示 数据库 ;
使用 testdb;

使用以下模式创建
"testtable"
在数据库下命名的表
"testdb"


创建 表 testtable(PersonID int,LastName varchar(255),FirstName varchar(255),City varchar(255),Date datetime(6));

现在在上表中插入一些测试数据:

INSERT  INTO testtable(PersonID,LastName,FirstName,City,Date)
VALUES('4005','Kallis','Jaques','Cape Town','2016-05-23 16:12:03.568810');
INSERT  INTO testtable(PersonID,LastName,FirstName,City,Date)
VALUES('4004','Baron','Richard','Cape Town','2016-05-23 16:12:04.370460');
INSERT  INTO testtable(PersonID,LastName,FirstName,City,

我们创建了一个包含3名员工详细信息的表。您可以通过传递查询来显示表的详细信息以显示其所有内容:

select * from testtable

结果表将如下所示:



Logstash配置

现在我们已经创建了一个内容如上所示的MySql表,看看如何配置Logstash。在logstash文件夹中,我们有一个
logstash.conf
文件,它是要配置和运行以获取必要的结果的文件。初始配置如以下屏幕截图所示:



在上面的配置文件中,我们提到了许多参数,例如:JDBC连接器检查数据的数据库,JDBC插件的位置,MySql访问的用户名和密码,以及查询语句。将上述设置应用于
"logstash.conf"
文件后,通过键入以下命令运行Logstash:

bin / logstash -f logstash .conf

如JDBC部分中的框图中所述,logstash配置文件将查询传递给JDBC驱动程序以及用户凭据。它还获取数据并将数据提供给Logstash。Logstash将使其JSON格式化并索引到Elasticsearch数据库。查询索引
"test-migrate"
如下:

curl -XPOST'http :// localhost:9200 / test-migrate / _search?pretty = true'  -d  '{}'

上述查询将每行作为单独的文档列出,列为字段。一个例子:

{
“_index” : “测试迁移”,
“_type” : “数据”,
“_id” :“4004” ,
“_score” :1,
“_source”:{
“PERSONID” :4004,
“姓氏”:“男爵“,
”名字“:”理查“,
”城市“:”开普敦“,
”日期“:”2016-05-23T10:42:04.370Z“ ,
“@version”:“1”,
“@timestamp”:“2016-07-10T10:36:43.685Z”
}}
}}


更多配置

在本节中,我们将展示各种用例场景。向上面的MySql中添加另一行数据,如下所示:

INSERT  INTO testtable(PersonID,LastName,FirstName,City,Date)
VALUES('4002','Cheers','Joan','Cape Town','2016-05-23 16:12:07.163681');

另外,更新同一表格中一行的现有值,如下所示:

UPDATE测试表
- > SET FirstName = 'James'
- > WHERE PersonID = 4005 ;


1.重复问题

完成上述步骤后,再次运行logstash配置文件。我们期望总共4个文档包括新的行和更新的行。但是,当再次检查索引时,不是这样的。相反,我们共有7个文件。这是因为初始文档在elasticsearch数据库中保持不变,这是由于没有从表本身提供特定的id。当我们运行logstash配置时,整个内容
"testtable"
都会被索引一次。

我们如何解决这种重复?我们必须为每个文档提供一个唯一的ID。对于每次运行,每个文档的ID应该相同,以防止重复问题。这可以通过编辑conf文件的输出部分来实现,如下所示:



2.变换操作

我们遇到的其他重要需求之一将是更改字段名称和值,当我们索引到elasticsearch。我们在我们当前的例子中添加一个要求来演示这个。对于所有与“开普敦”匹配的文档,该字段
"City"
应替换为值
"South Africa"
,字段值应替换为
"Country"


为了实现此要求,请使用该
"filter"
属性在elasticsearch中操作已提取的数据。使用
"mutate"
里面的属性
"filter"
执行所需的更改。使用上面的
"input"
"output"
部分的设置,我们需要
"filter"
logstash.conf
文件中添加以下部分:

filter {
if [city] == “开普敦” {
mutate {
rename => { “city” => “country” }
replace => [ “country”,“South Africa” ]
}}
}}
}}

检查所获取的数据
"Cape Town"
的每个事件的
"City"
列的值。如果找到匹配项,
"City"
则重命名该字段,
"country"
并将每个匹配的值替换为
"South Africa"


3.计划和增量更新

如果数据在MySql数据库中不断更新,我们需要递增和定期对其进行索引,该怎么办?要定期获取数据,请
"scheduler"
在输入部分中添加属性。在
"scheduler,"
给定的值时,让每隔一段时间运行conf文件。它是高度可定制的,并使用Rufus调度程序语法

对于增量更新,请修改查询以
"sql_last_value"
针对字段使用。这里我们给这个字段
"Date"
。我们还设置
"use_column_value"
为true,并将对应的列链接到
"Date"
使用
"tracking_column"


用于情况1,2和3的完整配置文件如下:



为了看到上面的配置工作,添加几个字段到现有的MySql表,其
"Date"
值比之前存在的值更新。现在运行logstash,您可以看到只有新数据已在Elasticsearch索引中建立索引。

结论

在本文中,我们讨论了用于使用logstash将数据从连续数据库迁移到Elasticsearch的JDBC插件。我们还熟悉如何处理常见问题,如重复,字段和值的突变,以及调度和增量更新。问题/意见?给我们一行下面。
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: