您的位置:首页 > 其它

Logstash:使用 XML filter 来导入文件

2020-08-31 16:20 701 查看

JSON 是目前非常流行的一种存储文件的格式,但是在实际的应用中,也有很多的文件格式是  XML 格式的。那么我们该如何来处理 XML 格式的文件并把它们导入到 Elasticsearch 中呢?在今天的文章中,我们将以一个例子来说明。我们将使用 XML filter 来导入 XML 格式的数据。

如何在 ElasticSearch 中导入我的自定义 XML 文件,幸运的是 Logstash 可以为你提供帮助。 让我们创建一个示例 XML 文件,该文件要导入到 Elasticsearch 中。 复制下面的文本并将其另存为 “test1.xml”,你也可以使用自己的XML。

test1.xml

[code]<xmldata>
<head1>
<key1>Value1</key1>
<key2>Value2</key2>
<id>0001</id>
<date>Aug 13 2011 00:03:44</date>
</head1>
<head2>
<key3>Value3</key3>
</head2>
</xmldata>

创建 Logstash 配置文件

Logstash 可以使用输入文件下的多行选项来读取 XML 文件。 下面显示示例配置文件;

[code]input {
file {
path => "/Users/liuxg/elastic/logstash-7.9.0/test1.xml"
start_position => beginning
sincedb_path => "/dev/null"

codec => multiline {
pattern => "^<\?xmldata .*\>"
negate => true
what => "previous"
auto_flush_interval => 1
}
}
}

在你的使用中,你需要根据自己的实际情况进行修改上面的 test1.xml 的路径。

配置的过滤器部分将读取 XML。 这个例子将过滤掉特定的值: id,date, key1 及 key3。 另外,date 值将转换为 Elasticsearch 和 Kibana 正确使用的值。

[code]filter {
xml {
store_xml => false
source => "message"
xpath => [
"/xmldata/head1/id/text()", "id",
"/xmldata/head1/date/text()", "date",
"/xmldata/head1/key1/text()", "key1",
"/xmldata/head2/key3/text()", "key3"
]
}

mutate {
add_field => { "timestamp" => "%{[date][0]}" }
}

date {
match => [ "timestamp" , "MMM dd yyyy HH:mm:ss" ]
locale => en
}

mutate {
remove_field => ["message", "timestamp"]
}
}

在输出部分下,我们配置 Elasticsearch 和 stout,以便我们可以直接在控制台中查看输出。 在此示例中,从 XML 文件使用 Elasticsearch document_id。 可选,您可以设置 document_type 部分。

[code]output {
stdout { codec => rubydebug }

elasticsearch {
index => "logstash-xml"
hosts => ["localhost:9200"]
document_id => "%{[id]}"
}
}

运行 Logstash

将三个部分(input/filter/output)放在一起时,便具有 LogStash 的完整配置文件。 将此文件另存为 logstash-xml.conf:

[code]input {
file {
path => "/Users/liuxg/elastic/logstash-7.9.0/test1.xml"
start_position => beginning
sincedb_path => "/dev/null"

codec => multiline {
pattern => "^<\?xmldata .*\>"
negate => true
what => "previous"
auto_flush_interval => 1
}
}
}

filter {
xml {
store_xml => false
source => "message"
xpath => [
"/xmldata/head1/id/text()", "id",
"/xmldata/head1/date/text()", "date",
"/xmldata/head1/key1/text()", "key1",
"/xmldata/head2/key3/text()", "key3"
]
}

mutate {
add_field => { "timestamp" => "%{[date][0]}" }
}

date {
match => [ "timestamp" , "MMM dd yyyy HH:mm:ss" ]
locale => en
}

mutate {
remove_field => ["message", "timestamp"]
}
}

output {
stdout { codec => rubydebug }

elasticsearch {
index => "logstash-xml"
hosts => ["localhost:9200"]
document_id => "%{[id]}"
}
}

你可以使用以下命令测试配置:

[code]sudo ./bin/logstash -f logstash-xml.conf --configtest

直接运行配置文件:

[code]sudo ./bin/logstash -f logstash-xml.conf

我们可以在 Logstash 的 console 里看到如下的信息:

我们可以在 Elasticsearch 中看到被导入的数据:

[code]GET logstash-xml/_search
[code]{
"took" : 0,
"timed_out" : false,
"_shards" : {
"total" : 1,
"successful" : 1,
"skipped" : 0,
"failed" : 0
},
"hits" : {
"total" : {
"value" : 1,
"relation" : "eq"
},
"max_score" : 1.0,
"hits" : [
{
"_index" : "logstash-xml",
"_type" : "_doc",
"_id" : "0001",
"_score" : 1.0,
"_source" : {
"@timestamp" : "2011-08-12T16:03:44.000Z",
"key3" : [
"Value3"
],
"path" : "/Users/liuxg/elastic/logstash-7.9.0/test1.xml",
"id" : [
"0001"
],
"@version" : "1",
"host" : "liuxg",
"tags" : [
"multiline"
],
"key1" : [
"Value1"
],
"date" : [
"Aug 13 2011 00:03:44"
]
}
}
]
}
}

我们可以看到 _id 值为 “0001”,也就是在我们的 XML 文件中所定义的值。

内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: