您的位置:首页 > 其它

Logstash使用介绍

2017-09-28 10:30 495 查看

Logstash介绍

Logstash是一个数据收集处理转发系统,是 Java开源项目。 它只做三件事:

数据输入

数据加工(不是必须的):如过滤,改写等

数据输出





下载安装

logstash是基于Java的服务,各操作系统安装Java环境均可使用。

Java
https://www.java.com/zh_CN/
安装后配置好java环境变量。
logstash
最新版 https://www.elastic.co/downloads/logstash
2.3.4版 https://www.elastic.co/downloads/past-releases/logstash-2-3-4

配置结构

#输入
Input{
Jdbc{}
}
#加工过滤
Filter{
Json{}
}
#输出
Output{
elasticsearch{}
}


支持的插件

Input: elasticsearch,file,http_poller,jdbc,log4j,rss,rabbitmq,redis,syslog,tcp,udp…等

Filter: grok,json,mutate,split …等

Output: email,elasticsearch,file,http,mongodb,rabbitmq,redis,stdout,tcp,udp …等

配置说明地址https://www.elastic.co/guide/en/logstash/current/input-plugins.html

应用示例

#jdbc_demo
input {
jdbc {
#数据库链接设置
jdbc_driver_library => "D:\elk\新建文件夹\sqljdbc_6.0\chs\sqljdbc42.jar"
jdbc_driver_class => "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbc_connection_string => "jdbc:sqlserver://192.168.8.61;user=sa;password=sa;DatabaseName=EPPartnerStatistic;"
jdbc_user => "sa"
jdbc_password => "sa"

statement_filepath => "../config-demo/sqlscript/jdbc_demo.sql"         #sql脚本
schedule => "* * * * *"                                                #执行计划

record_last_run => true                                                #记录最后运行值
use_column_value => true                                               #
tracking_column => id                                                  #要记录的字段
last_run_metadata_path => "../config-demo/log/jdbc_demo"               #记录的位置

lowercase_column_names => true                                         #将字段名全部改为小写
clean_run => false

jdbc_fetch_size => 1000                                                #分页设置
jdbc_page_size => 1000
jdbc_paging_enabled => true
}
}
filter {
mutate {
#重命名,可以将字段名改掉
rename => {
"id" => "Id"
"bid" => "BId"
"saleconsultantid" => "SaleConsultantId"
"avgreplyduration" => "AvgReplyDuration"
"avgreplyratio" => "AvgReplyRatio"
"avgonlineduration" => "AvgOnlineDuration"
"stattime" => "StatTime"
}
}
}
output {
elasticsearch {
hosts => ["192.168.1.13:9200"]                            #es 服务地址
index => "jdbc_demo"                                       #索引的名字
document_type => "demoinfo"                                #类型的名字
workers => 1                                               #输出时进程数
document_id=>"%{Id}"                                       #文档的唯一ID
template => "../config-demo/templates/jdbc_demo.json"      #模板的路径
template_name => "jdbc_demo"                               #模板的名字
template_overwrite => false                                ##是否覆盖已存在的模板
}
#  stdout{
#     codec => rubydebug
# }
}


索引模板

{
"order": 1,
"template": "jdbc_demo",                  //模板匹配规则,已经索引名匹配(eg:jdbc_demo-*,可以匹配到,jdbc_demo-1,jdbc_demo-14...)
"settings": {
"index.number_of_shards": 4,          //分片数
"number_of_replicas": 1               //每个分配备份数
},
"mappings": {
"_default_": {
"_source": {
"enabled": true
}
},
"demoinfo": {           //动态模板 如果映射后,有新的字段添加进来,并且在字段区域没有映射会按该动态模板匹配映射
"dynamic_templates": [
{
"string_field": {
"match": "*",
"match_mapping_type": "string", //匹配到所有字符串 设置为不分词
"mapping": {
"index": "not_analyzed",
"type": "string"
}
}
}
],                      //类型名
"_source": {
"enabled": true
},
"_all": {
"enabled": false
},
"properties": {                 //字段区域
"Id": {
"type": "long"
},
"BId": {
"type": "integer"
},
"SaleConsultantId": {
"type": "integer"
},
"AvgReplyDuration": {
"type": "integer"
},
"AvgReplyRatio": {
"type": "double"
},
"AvgOnlineDuration": {
"type": "integer"
},
"StatTime": {
"format": "strict_date_optional_time||epoch_millis",
"type": "date"
}
}
}
},
"aliases": {
"logstashdemo": {}       //别名,匹配到该模板的会设置一个别名
}
}


Sqlserver 查询语句

SELECT  [Id]
,[BId]
,[SaleConsultantId]
,[AvgReplyDuration]
,[AvgReplyRatio]
,[AvgOnlineDuration]
,[StatTime]
FROM [EPPartnerStatistic].[dbo].[StatSessionBy7Day]
WHERE Id>:sql_last_value
--:sql_last_value是记录的最后的一个值


5.x模板String字段

{"properties": {                 //字段区域
"NewField": {
"type": "keyword",       // keyword 不分词
"index": false            //不建立索引
},
"NewFieldText": {
"type": "text",           // text分词
"index": true            // 建立索引
}
}
}


注意事项

Jdbc(input)拉取数据使用分页功能时无法查询text、ntext 和 image字段。

jdbc(input)使用分页时会将字段全部转换为大写。

elasticsearch(output)的模板中匹配符,一定要能够匹配到索引名称才能生效,并且要避免让其他索引匹配到,以免影响其它新索引。

elasticsearch(output)定义索引名称必须全小写(es限制)。

以时间字段进行跟踪时sql查询语句不能使用Top x限制每次查询条数,这会导致最后的记录值并非是最大的值,所拉取的数据可能会出现数据重复拉取(但是在es中不会体现为多条重复数据,只是version字段会>1)或数据丢失。

跟踪主键ID时需显示设置 order by id asc

无检索文本内容需设置“index”: “not_analyzed”

资料

es字段对应类型 https://www.elastic.co/guide/en/elasticsearch/reference/2.3/mapping-types.html
logstash官网地址 https://www.elastic.co/guide/en/logstash/2.3/configuration.html

sqlservice jdbc jar包http://www.microsoft.com/zh-cn/download/details.aspx?id=11774&a03ffa40-ca8b-4f73-0358-c191d75a7468=True
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签: