您的位置:首页 > 其它

ELK日志分析平台搭建全过程

2017-12-07 00:00 197 查看
一、使用背景

当生产环境有很多服务器、很多业务模块的日志需要每时每刻查看时

二、环境

系统:centos6.5

JDK:1.8

Elasticsearch-5.0.0

Logstash-5.0.0

kibana-5.0.0

三、安装

1、安装JDK

下载JDK:http://www.oracle.com/technetwork/java/javase/downloads/jdk8-downloads-2133151.html

本环境下载的是64位tar.gz包,将安装包拷贝至安装服务器/usr/local目录

[root@localhost~]#cd/usr/local/
[root@localhostlocal]#tar-xzvfjdk-8u111-linux-x64.tar.gz

配置环境变量

[root@localhostlocal]#vim/etc/profile

将下面的内容添加至文件末尾(假如服务器需要多个JDK版本,为了ELK不影响其它系统,也可以将环境变量的内容稍后添加到ELK的启动脚本中)

JAVA_HOME=/usr/local/jdk1.8.0_111
JRE_HOME=/usr/local/jdk1.8.0_111/jre
CLASSPATH=.:$JAVA_HOME/lib:/dt.jar:$JAVA_HOME/lib/tools.jar
PATH=$PATH:$JAVA_HOME/bin
exportJAVA_HOME
exportJRE_HOME

ulimit-u4096

[root@localhostlocal]#source/etc/profile

配置limit相关参数

[root@localhostlocal]#vim/etc/security/limits.conf
添加以下内容

*softnproc65536
*hardnproc65536
*softnofile65536
*hardnofile65536

创建运行ELK的用户

[root@localhostlocal]#groupaddelk

[root@localhostlocal]#useradd-gelkelk



创建ELK运行目录

[root@localhostlocal]#mkdir/elk
[root@localhostlocal]#chown-Relk:elk/elk

关闭防火墙:

[root@localhost~]#iptables-F

以上全部是root用户完成

2、安装ELK

以下由elk用户操作

以elk用户登录服务器

下载ELK安装包:https://www.elastic.co/downloads,并上传到服务器且解压,解压命令:tar-xzvf包名



配置Elasticsearch



修改如下内容:





保存退出

启动Elasticsearch



查看是否启动成功



用浏览器访问:http://192.168.10.169:9200



Elasticsearch安装完毕

安装logstash

logstash是ELK中负责收集和过滤日志的



编写配置文件如下:



解释:

logstash的配置文件须包含三个内容:

input{}:此模块是负责收集日志,可以从文件读取、从redis读取或者开启端口让产生日志的业务系统直接写入到logstash

filter{}:此模块是负责过滤收集到的日志,并根据过滤后对日志定义显示字段

output{}:此模块是负责将过滤后的日志输出到elasticsearch或者文件、redis等

本环境采用从文件读取日志,业务系统产生日志的格式如下:

[2016-11-0500:00:03,731INFO][http-nio-8094-exec-10][filter.LogRequestFilter]-/merchant/get-supply-detail.shtml,IP:121.35.185.117,[device-dpi=414*736,version=3.6,device-os=iOS8.4.1,timestamp=1478275204,bundle=APYQ9WATKK98V2EC,device-network=WiFi,token=393E38694471483CB3686EC77BABB496,device-model=iPhone,device-cpu=,sequence=1478275204980,device-uuid=C52FF568-A447-4AFE-8AE8-4C9A54CED10C,sign=0966a15c090fa6725d8e3a14e9ef98dc,request={
"supply-id":192
}]
[2016-11-0500:00:03,731DEBUG][http-nio-8094-exec-10][filter.ValidateRequestFilter]-Unsigned:bundle=APYQ9WATKK98V2EC&device-cpu=&device-dpi=414*736&device-model=iPhone&device-network=WiFi&device-os=iOS8.4.1&device-uuid=C52FF568-A447-4AFE-8AE8-4C9A54CED10C&request={
"supply-id":192

output直接输出到Elasticsearch

本环境需处理两套业务系统的日志



type:代表类型,其实就是将这个类型推送到Elasticsearch,方便后面的kibana进行分类搜索,一般直接命名业务系统的项目名

path:读取文件的路径



这个是代表日志报错时,将报错的换行归属于上一条message内容

start_position=>"beginning"是代表从文件头部开始读取



filter{}中的grok是采用正则表达式来过滤日志,其中%{TIMESTAMP_ISO8601}代表一个内置获取2016-11-0500:00:03,731时间的正则表达式的函数,%{TIMESTAMP_ISO8601:date1}代表将获取的值赋给date1,在kibana中可以体现出来

本环境有两条grok是代表,第一条不符合将执行第二条



其中index是定义将过滤后的日志推送到Elasticsearch后存储的名字

%{type}是调用input中的type变量(函数)

启动logstash





代表启动成功

安装kibana





保存退出

启动kibana







其中api-app-*和api-cxb-*从

来的,*代表所有



代表实时收集的日志条数



红色框内的就是在刚才filter过滤规则中定义的

ELK是指Elasticsearch+Logstash+Kibaba三个组件的组合。本文讲解一个基于日志文件的ELK平台的搭建过程,有关ELK的原理以及更多其他信息,会在接下来的文章中继续研究。
  在这个系统中,Elasticsearch主要充当一个全文检索和分析引擎,Logstash是一款分布式日志收集系统,Kibana可以为这个平台提供可视化的Web界面。

一、环境准备

  三台虚拟机:m000,m001,m002,操作系统版本为Ubuntu-14.04
  Elasticsearch-2.3.2
  Logstash-2.3.2
  Kibana-4.5.1
  JDK-1.7.0_79  
  
  在该系统中ELK的关系如下图所示:
  


二、各组件的部署

  ELK的运行依赖于Java环境,JDK可自行安装,本节主要讲Elasticsearch,Logstash和Kibaba的安装和配置过程。

1、Elasticsearch

(1)elasticsearch
  下载地址:https://www.elastic.co/downloads/elasticsearch
  下载好后,上传到m000:/usr/local/elk路径下解压缩,设置/usr/local/elasticsearch软连接指向该路径。进入ES_HOME/config目录中编辑elasticsearch.yml文件。设置network.host:m000,http.port:9200设置访问地址和端口号,否则不能在浏览器中访问。设置cluster.name:es_cluster,node.name:m000,这两个参数主要设置ES集群的集群名称,以及这台机器在集群中的名称。设置path.data:/usr/local/elasticsearch/data,path.logs:/usr/local/elasticsearch/logs,这两个参数主要设置ES存储data和log的路径。
  配置好后启动输入ES_HOME/bin/elasticsearch命令启动es(加入参数-d,es会在后台运行),正常启动如下图:
  


  在另外一个命令窗口中检查启动状态,
  


  也可在页面上查看,输入m000:9200,
  


  经过上述操作检查无误后,可以将m000上的elasticsearch分发到m001和m002机器上,记得分发后修改各自的node.name以及network.host两个属性。网上很多关于Elasticsearch的安装中都说道,保持每台机器上的cluster.name属性一致时,如果将各自的es服务都启动,系统会自动将clustername相同的机器组成一个集群。但是在本次操作中发现,这个描述对Elasticsearch-2.3.3版本无效,集群需要手动指定。分别修改m000,m001,m002三台机器上的elasticsearch.yml文件,在其中加入discovery.zen.ping.unicast.hosts:["m000","m001","m002"]和discovery.zen.minimum_master_nodes:2参数后,将m000-003上的elasticsearch服务启动才能组成一个集群。

(2)head插件
  Elasticsearch可以安装很多插件,接下来我们安装一个head插件,用于查看集群相关信息。往往大多数生产环境中服务器都不能联外网,所以我们采取下载安装的方式。
  Head插件的下载地址:https://github.com/mobz/elasticsearch-head
下载后,在ES_HOME/plugins路径下解压缩,重命名成head,并删除压缩包。这样,在启动m000,m001和m002上的ES服务后,在http://m000:9200/_plugin/head/可以在页面上看到如下信息:



  在这个页面上可以看到有三台机器,其中m000是master节点,在该页面上可以查看各节点的其他信息。
  
(3)marvel插件
  Marvel是Elasticsearch的管理和监控工具,它提供了一个叫做Sense的交互式控制台供用户通过浏览器直接与Elasticsearch进行交互。有关这个插件的联网安装方式,网上也有很多介绍,本文仍然以Offline方式安装marvel插件。
  下载以下三个文件:

https://download.elastic.co/elasticsearch/release/org/elasticsearch/plugin/license/2.3.3/license-2.3.3.zip

https://download.elastic.co/elasticsearch/release/org/elasticsearch/plugin/marvel-agent/2.3.3/marvel-agent-2.3.3.zip

https://download.elasticsearch.org/elasticsearch/marvel/marvel-2.3.3.tar.gz
  将这三个文件上传到m000:/usr/local/elk  
hadoop@m000:/usr/local/elasticsearch/bin$./plugininstallfile:///usr/local/elk/license-2.3.3.zip
hadoop@m000:/usr/local/elasticsearch/bin$./plugininstallfile:///usr/local/elk/marvel-agent-2.3.3.zip
  将m000:/usr/local/elasticsearch/plugins目录分发到m001和m002上。
  第三个文件marvel-2.3.3.tar.gz会在Kibana部分用到。
  有关marvel的配置,可以参考官方网站:https://www.elastic.co/guide/en/marvel/current/configuration.html
  关于使用,会在下面Kibana中讲解。

2、Logstash

  下载地址:https://www.elastic.co/downloads/logstash
  下载好后,上传到m000:/usr/local/elk路径下解压缩,设置/usr/local/logstash软连接指向该路径。
  接下来用Logstash将Hadoop的yarn日志加载进来,进入LS_HOME,新建一个config_file目录存储自定义配置文件log.conf。

input{ file{ type=>"hadoop-yarnlog" path=>"/usr/local/hadoop/logs/yarn-hadoop-resourcemanager-m000.log" } } output{ elasticsearch{ hosts=>"m000:9200" index=>"logstash-%{type}-%{+YYYY.MM.dd}" template_overwrite=>true } }

1

2

3

4

5

6

7

8

9

10

11

12

13

14

  启动Logstash,LS_HOME/bin/logstashagent-fconfig_file/log.conf在ES的head页面可以看到多了一个logstash-hadoop-yarnlog的文件,下图中前面两个是marvel相关数据。
  


  可以在数据浏览,基本查询,复合查询中对该文件进行相关的查询操作。比如输入时间参数查看最近4分钟的日志文件:
  


  更多的查询操作,可以在对Elasticsearch进一步学习中讲解到。
  

3、Kibana

  下载地址:https://www.elastic.co/downloads/kibana
  下载好后,上传到m000:/usr/local/elk路径下解压缩,设置/usr/local/kibana软连接指向该路径。进入KB_HOME/config文件夹,编辑kibana.yml文件,设置server.port:5601,server.host:"m000,elasticsearch.url:"http://m000:9200"。
  启动kibaba,KB_HOME/bin/kibana,正常启动可以在控制台中看到如下信息:
  


  在浏览器中输入m000:5601,可以看到如下页面:
  


看到上图中的界面,表示ELK已经正常配置了,点击Create–>Discover



可以看到加载的日志文件中的一些数据。

  最后继续在Kibana中把marvel插件安装完成。输入KB_HOME/bin/kibanaplugin--installmarvel--urlfile:///usr/local/elk/marvel-2.3.3.tar.gz如下图
  


启动kibana,浏览器访问m000:5601,点击选图中的红色方框,



在新窗口中选择Marvel,可以看到Elasticsearch集群的监控信息,



点击上图中的es_cluster,可以看到es_cluster集群中各节点详细监控信息,



1.介绍:

NRT
elasticsearch是一个近似实时的搜索平台,从索引文档到可搜索有些延迟,通常为1秒。

集群
集群就是一个或多个节点存储数据,其中一个节点为主节点,这个主节点是可以通过选举产生的,并提供跨节点的联合索引和搜索的功能。集群有一个唯一性标示的名字,默认是elasticsearch,集群名字很重要,每个节点是基于集群名字加入到其集群中的。因此,确保在不同环境中使用不同的集群名字。一个集群可以只有一个节点。强烈建议在配置elasticsearch时,配置成集群模式。

节点
节点就是一台单一的服务器,是集群的一部分,存储数据并参与集群的索引和搜索功能。像集群一样,节点也是通过名字来标识,默认是在节点启动时随机分配的字符名。当然啦,你可以自己定义。该名字也蛮重要的,在集群中用于识别服务器对应的节点。

节点可以通过指定集群名字来加入到集群中。默认情况下,每个节点被设置成加入到elasticsearch集群。如果启动了多个节点,假设能自动发现对方,他们将会自动组建一个名为elasticsearch的集群。

索引
索引是有几分相似属性的一系列文档的集合。如nginx日志索引、syslog索引等等。索引是由名字标识,名字必须全部小写。这个名字用来进行索引、搜索、更新和删除文档的操作。
索引相对于关系型数据库的库。

类型
在一个索引中,可以定义一个或多个类型。类型是一个逻辑类别还是分区完全取决于你。通常情况下,一个类型被定于成具有一组共同字段的文档。如ttlsa运维生成时间所有的数据存入在一个单一的名为logstash-ttlsa的索引中,同时,定义了用户数据类型,帖子数据类型和评论类型。
类型相对于关系型数据库的表。

文档
文档是信息的基本单元,可以被索引的。文档是以JSON格式表现的。
在类型中,可以根据需求存储多个文档。
虽然一个文档在物理上位于一个索引,实际上一个文档必须在一个索引内被索引和分配一个类型。
文档相对于关系型数据库的列。

分片和副本
在实际情况下,索引存储的数据可能超过单个节点的硬件限制。如一个十亿文档需1TB空间可能不适合存储在单个节点的磁盘上,或者从单个节点搜索请求太慢了。为了解决这个问题,elasticsearch提供将索引分成多个分片的功能。当在创建索引时,可以定义想要分片的数量。每一个分片就是一个全功能的独立的索引,可以位于集群中任何节点上。
分片的两个最主要原因:
a、水平分割扩展,增大存储量
b、分布式并行跨分片操作,提高性能和吞吐量
分布式分片的机制和搜索请求的文档如何汇总完全是有elasticsearch控制的,这些对用户而言是透明的。
网络问题等等其它问题可以在任何时候不期而至,为了健壮性,强烈建议要有一个故障切换机制,无论何种故障以防止分片或者节点不可用。
为此,elasticsearch让我们将索引分片复制一份或多份,称之为分片副本或副本。
副本也有两个最主要原因:
高可用性,以应对分片或者节点故障。出于这个原因,分片副本要在不同的节点上。
提供性能,增大吞吐量,搜索可以并行在所有副本上执行。
总之,每一个索引可以被分成多个分片。索引也可以有0个或多个副本。复制后,每个索引都有主分片(母分片)和复制分片(复制于母分片)。分片和副本数量可以在每个索引被创建时定义。索引创建后,可以在任何时候动态的更改副本数量,但是,不能改变分片数。
默认情况下,elasticsearch为每个索引分片5个主分片和1个副本,这就意味着集群至少需要2个节点。索引将会有5个主分片和5个副本(1个完整副本),每个索引总共有10个分片。
每个elasticsearch分片是一个Lucene索引。一个单个Lucene索引有最大的文档数LUCENE-5843,文档数限制为2147483519(MAX_VALUE–128)。可通过_cat/shards来监控分片大小。

索引和类型的解释:



ELK的含义:

E:elasticsearch

  ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTfulweb接口。

  也就是将logstach收集上来的日志储存,建立索引(便于查找),搜索(提供web展示)

l:logstash

  收集日志

  数据源:各种log,文本,session,silk,snmp

k:kibana

  数据展示,web页面,可视化

  可以完成批量分析

  数据集之间关联

  产生图表

  报警(python/R语言)

  ESpythonapi的文档

  pythonapihttp://elasticsearch-py.readthedocs.io/en/master/

  大量的查询或者过滤选项可以使用json语法:

  任何周期都能查询

ELK关系:

LEK:logstatsh收集日志,存到elasticserach(存储,产生索引,搜索)到kibana展现(view)

2.安装

1、下载tar包直接解压(灵活)

2、配置yum源直接安装(方便)

服务器部署:

logstatsh:部署在想收集日志的服务器上。

elasticsearch:主要是用于数据收集,索引,搜索提供展示,随意安装在那台服务器上都可以,重要的是es支持分布式,而且再大规模的日志分析中必须做分布式集群。这样可以跨节点索引和搜索。提高吞吐量与计算能力。

kibana:数据展示,部署在任意服务器上。

这里我们做实验使用的是两台服务器

1
2
node1.wawa.com:192.168.31.179
node2.wawa.com:192.168.31.205

a、准备环境:

  配置hosts两台服务器网络通畅  

  node1安装es,node2安装es做成集群,后期可能还会用到redis,redis提供的功能相当于kafka,收集logstatsh发来的数据,es从redis中提取数据。

  node1安装kibana做数据展示

  node2安装logstatsh做数据收集

  创建elasticsearch用户

b、安装:

  由于eslogstatshkibana基于java开发,所以安装jdk,jdk版本不要过低,否则会提醒升级jdk。

安装elasticsearch(node1,node2全都安装es)

下载并安装GPGkey

2.x

1
[root@linux-node1~]#rpm--import'target='_blank'>https://packages.elastic.co/GPG-KEY-elasticsearch[/code]
5.1

1
rpm--importhttps://artifacts.elastic.co/GPG-KEY-elasticsearch
  

添加yum仓库 

1
2
3
4
5
6
7
[root@linux-node2~]#vim/etc/yum.repos.d/elasticsearch.repo
[elasticsearch-2.x]
name=Elasticsearchrepositoryfor2.xpackages
baseurl=http://packages.elastic.co/elasticsearch/2.x/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
1
2
3
4
5
6
7
8
[elasticsearch-5.x]
name=Elasticsearchrepositoryfor5.xpackages
baseurl=https://artifacts.elastic.co/packages/5.x/yum
gpgcheck=1
gpgkey=https://artifacts.elastic.co/GPG-KEY-elasticsearch
enabled=1
autorefresh=1
type=rpm-md
es需要jdk8,但是由于服务器有的业务需要1.7,所以可以让两个共存  



安装elasticsearch

1
[root@hadoop-node2~]#yuminstall-yelasticsearch
问题:

阿里服务器下载和用yum安装由于链接是https的问题报错

增加yum源报错


报错内容

下载GPGkey


ViewCode

下载tar包安装,更简单,解压即可运行,只不过没有yum安装提供的启动脚本

安装kibana(这里使用的tar包安装,es、logtar包方法一样)

1
2
3
4
5
[root@linux-node2~]#cd/usr/local/src
[root@linux-node2~]#wget'target='_blank'>https://download.elastic.co/kibana/kibana/kibana-4.3.1-linux-x64.tar.gz[/code]tarzxfkibana-4.3.1-linux-x64.tar.gz
[root@linux-node1src]#mvkibana-4.3.1-linux-x64/usr/local/
[root@linux-node2src]#ln-s/usr/local/kibana-4.3.1-linux-x64//usr/local/kibana
安装logstatsh(node2安装)

下载并安装GPGkey

1
[root@linux-node2~]#rpm--importhttps://packages.elastic.co/GPG-KEY-elasticsearch
添加yum仓库  

1
2
3
4
5
6
7
[root@linux-node2~]#vim/etc/yum.repos.d/logstash.repo
[logstash-2.1]
name=Logstashrepositoryfor2.1.xpackages
baseurl=http://packages.elastic.co/logstash/2.1/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
安装logstash  

1
[root@linux-node2~]#yuminstall-ylogstash
  

c、配置管理elasticsearch

1
2
3
4
5
6
7
8
9
10
[root@linux-node1src]#grep-n'^[a-Z]'/etc/elasticsearch/elasticsearch.yml
17:cluster.name:chuck-cluster判别节点是否是统一集群,多台统一集群的es名称要一致
23:node.name:linux-node1节点的hostname
33:path.data:/data/es-data数据存放路径
37:path.logs:/var/log/elasticsearch/日志路径
43:bootstrap.memory_lock:true锁住内存,使内存不会再swap中使用
54:network.host:0.0.0.0允许访问的ip
58:http.port:9200端口
[root@linux-node1~]#mkdir-p/data/es-data
[root@linux-node1src]#chownelasticsearch.elasticsearch/data/es-data/
  

d、启动elasticsearch

[root@node2~]#/etc/init.d/elasticsearchstatus elasticsearch(pid23485)正在运行... Youhavenewmailin/var/spool/mail/root [root@node2~]#psaux|grepelasticsearch 505234852.153.12561964264616?Sl17:096:07/usr/bin/java-Xms256m-Xmx1g-Djava.awt.headless=true-XX:+UseParNewGC-XX:+UseConcMarkSweepGC-XX:CMSInitiatingOccupancyFraction=75-XX:+UseCMSInitiatingOccupancyOnly-XX:+HeapDumpOnOutOfMemoryError-XX:+DisableExplicitGC-Dfile.encoding=UTF-8-Djna.nosys=true-Des.path.home=/usr/share/elasticsearch-cp/usr/share/elasticsearch/lib/elasticsearch-2.4.2.jar:/usr/share/elasticsearch/lib/*org.elasticsearch.bootstrap.Elasticsearchstart-p/var/run/elasticsearch/elasticsearch.pid-d-Des.default.path.home=/usr/share/elasticsearch-Des.default.path.logs=/var/log/elasticsearch-Des.default.path.data=/var/lib/elasticsearch-Des.default.path.conf=/etc/elasticsearch root264250.00.1103260844pts/0S+21:570:00grepelasticsearch [root@node2~]#ss-tunlp|grepelasticsearch [root@node2~]#ss-tunlp|grep23485 tcpLISTEN050:::9200:::*users:(("java",23485,132)) tcpLISTEN050:::9300:::*users:(("java",23485,89))

启动问题:

+ViewCode

network.host:要填写本机的ip地址,最好是内网。

e、测试



交互方式:

交互的两种方法

JavaAPI:
nodeclient
Transportclient

RESTfulAPI
Javascript
.NET
php
Perl
Python
Ruby

ElasticSearch是一个基于Lucene的搜索服务器。它提供了一个分布式多用户能力的全文搜索引擎,基于RESTfulweb接口。

1、我们使用RESTfulweb接口

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@linux-node1src]#curl-i-XGET'http://192.168.56.11:9200/_count?pretty'-d'{
"query"{#查询
"match_all":{}#所有信息
}
}'<br>####################<br>HTTP/1.1200OK
Content-Type:application/json;charset=UTF-8
Content-Length:95
{
"count":0,索引0个
"_shards":{分区0个
"total":0,
"successful":0,成功0个
"failed":0失败0个
}
}
  

2、使用es强大的插件:head插件显示索引和分片情况

f、安装插件

1
2
[root@linux-node1src]#/usr/share/elasticsearch/bin/plugininstallmobz/elasticsearch-head
[root@linux-node1src]#/usr/share/elasticsearch/bin/pluginlist可以查看当前已经安装的插件
 


访问刚刚安装的head插件

1
http://192.168.31.179:9200/_plugin/head/
  



添加数据测试

 


  


增加:

命令行插入数据与查询数据(RESTful接口处理的JSON请求)

1
curl-XPOSThttp://127.0.0.1:9330/logstash-2017.01.09/testlog-d'{"date":"123456","user":"chenlin7","mesg":"firstmesasge"}'
 

返回值

1
{"_index":"logstash-2017.01.09","_type":"testlog","_id":"AVmBUmd9WXPobNRX0V5f","_version":1,"_shards":{"total":2,"successful":1,"failed":0},"created":true}
可以看到,在数据写入的时候,会返回该数据的。这就是后续用来获取数据的关键:

获取数据

1
curl-XGEThttp://127.0.0.1:9330/logstash-2017.01.09/testlog/AVmBUmd9WXPobNRX0V5f
返回值:

1
{"_index":"logstash-2017.01.09","_type":"testlog","_id":"AVmBUmd9WXPobNRX0V5f","_version":1,"found":true,"_source":{"date":"123456","user":"chenlin7","mesg":"firstmesasge"}}
这个里的内容,正是之前写入的数据。

如果觉得这个返回看起来有点太过麻烦,可以使用_source直接指定要获取内容

1
curl-XGEThttp://127.0.0.1:9330/logstash-2017.01.09/testlog/AVmBUmd9WXPobNRX0V5f/_source
返回值

1
{"date":"123456","user":"chenlin7","mesg":"firstmesasge"}
 

也可以直接指定字段:

curl-XGET'target='_blank'>http://115.29.229.72:9330/logstash-2017.01.09/testlog/AVmBUmd9WXPobNRX0V5f\?fields\=user,mesg[/code]
返回值

1
{"_index":"logstash-2017.01.09","_type":"testlog","_id":"AVmBUmd9WXPobNRX0V5f","_version":1,"found":true,"fields":{"user":["chenlin7"],"mesg":["firstmesasge"]}}%
删除

删除指定的单条数据

curl-XDELETE'target='_blank'>http://115.29.229.72:9330/logstash-2017.01.09/testlog/AVmB7OKdWXPobNRX0V5m[/code]
删除整个索引(尝试删除某一个类型应该是不支持)

curl-XDELETEhttp://115.29.229.72:9330/logstash-2017.01.09  orcurl-XDELETEhttp://115.29.229.72:9330/logstash-2017.01.*(支持通配符)
  

更新

更新有两种方法,意识全量提交,指明_id才发一次请求

1
2
3
4
5
6
#curl-XPOST'target='_blank'>http://127.0.0.1:9200/logstash-2015.06.21/testlog/[/code]AU4ew3h2nBE6n0qcyVJK-d'{
"date":"1434966686000",
"user":"chenlin7",
"mesg"""firstmessageintoElasticsearchbutversion2"
}'
另一个是局部更新使用/_update接口

指定doc添加或修改字段

1
curl-XPOST'http://127.0.0.1:9330/logstash-2017.01.09/testlog/AVmB92lCWXPobNRX0V5v/_update'-d'{"doc":{"age":"18"}}'
指定script(文档中操作是这样。没有试过)

#curl-XPOST'http://127.0.0.1:9200/logstash-2015.06.21/testlog
/AU4ew3h2nBE6n0qcyVJK/_update'-d'{
"script":"ctx._source.user=\"someone\""
}'

搜索请求

全文搜索:ES的搜索请求,有简易语法和完整语法两种

简易语法作为以后在kibana上最常用的方式。

1
curl-XGEThttp://115.29.229.72:9330/logstash-2017.01.09/testlog1/_search\?q\=first
这样就获取到了logstash-2017.01.09索引中的testlog1类型中first关键字的所有数据

{"took":4,"timed_out":false,"_shards":{          "total":5,"successful":5,"failed":0        },   "hits":{          "total":1,"max_score":0.30685282,        "hits":[{            "_index":"logstash-2017.01.09",          "_type":"testlog1","_id":"AVmB90IfWXPobNRX0V5u",       "_score":0.30685282,"_source":{           "date":"123456",           "user":"chenlin7",           "mesg":"firstmesasge"}         }]     } }

还可以使用

1
curl-XGEThttp://115.29.229.72:9330/logstash-2017.01.09/testlog/_search\?q\=user:"chenlin7"
或者知道某个字段一定在那个key中:例子中就是first一定是在mesg中

1
curl-XGEThttp://115.29.229.72:9330/logstash-2017.01.09/testlog/_search\?q\=mesg:first
  

  

node2安装好以后配置集群模式

1
2
3
4
[root@node1src]#scp/etc/elasticsearch/elasticsearch.yml192.168.56.12:/etc/elasticsearch/elasticsearch.yml
[root@node2elasticsearch]#sed-i'23s#node.name:linux-node1#node.name:linux-node2#g'elasticsearch.yml
[root@node2elasticsearch]#mkdir-p/data/es-data
[root@node2elasticsearch]#chownelasticsearch.elasticsearch/data/es-data/
 

node1与node2中都配置上(单播模式,听说还有组播默认,可以尝试一下)

1
2
3
[root@linux-node1~]#grep-n"^discovery"/etc/elasticsearch/elasticsearch.yml
79:discovery.zen.ping.unicast.hosts:["linux-node1","linux-node2"]
[root@linux-node1~]#systemctlrestartelasticsearch.service
  在浏览器中查看分片信息,一个索引默认被分成了5个分片,每份数据被分成了五个分片(可以调节分片数量),下图中外围带绿色框的为主分片,不带框的为副本分片,主分片丢失,副本分片会复制一份成为主分片,起到了高可用的作用,主副分片也可以使用负载均衡加快查询速度,但是如果主副本分片都丢失,则索引就是彻底丢失。



安装使用kopf插件,监控elasticsearch(elasticsearch服务器都安装)

1
[root@linux-node1bin]#/usr/share/elasticsearch/bin/plugininstalllmenezes/elasticsearch-kopf


重启es服务,访问,没有意外你就能看到这个界面





还有什么别的用暂时还不知道

安装logstatsh

下载并安装GPGkey

1
[root@linux-node2~]#rpm--import'target='_blank'>https://packages.elastic.co/GPG-KEY-elasticsearch[/code]
添加yum仓库

1
2
3
4
5
6
7
[root@linux-node2~]#vim/etc/yum.repos.d/logstash.repo
[logstash-2.1]
name=Logstashrepositoryfor2.1.xpackages
baseurl=http://packages.elastic.co/logstash/2.1/centos
gpgcheck=1
gpgkey=http://packages.elastic.co/GPG-KEY-elasticsearch
enabled=1
安装logstash

1
[root@linux-node2~]#yuminstall-ylogstash
也可以下载logstash的tar包解压即可使用

安装后就可以测试了

logstatsh有两种启动方式,一种用就是测试启动,一种就是正式启动

logstash工作方式:logstatsh的功能是收集日志文件,并将收集的日志文件发送给es服务器。然后es服务器产生索引,提供搜索,并且再交给web展示

但是日志类型和索引名称都是在logstatsh中定义的

a、首先我们熟悉logstatsh的格式是以jason为格式,其中定义输入输出

1
‘input{stdin{}}output{stdout{}}’
input:输入,output:输出

input可以是命令行手动输入,也可以是指定一个文件,或者一个服务,  

output是输出位置。可以是屏幕打印,也可以指定es服务器

我们先做一个最基础的命令行输入,和屏幕输出

1
2
3
4
5
6
[root@node2bin]#/opt/logstash/bin/logstash-e'input{stdin{}}output{stdout{}}'<br>#stdin指定输入为stdin标准输入output:指定stdout标准输出<br><br>Settings:Defaultfilterworkers:1
Logstashstartupcompleted
<br>chuck-->命令行输入
2016-01-14T06:01:07.184Znode2chuck==>屏幕输出
<br>www.chuck-blog.com-->命令行输入
2016-01-14T06:01:18.581Znode2www.chuck-blog.com==>屏幕输出
使用rubudebug显示详细输出,codec为一种编解码器

1
2
3
4
5
6
7
8
9
10
[root@node2bin]#/opt/logstash/bin/logstash-e'input{stdin{}}output{stdout{codec=>rubydebug}}'#codec指定输出的解码器,不知道还有没有别的解码器
Settings:Defaultfilterworkers:1
Logstashstartupcompleted
chuck--->屏幕输入
{
"message"=>"chuck",
"@version"=>"1",
"@timestamp"=>"2016-01-14T06:07:50.117Z",
"host"=>"node2"
}--->rubydebug格式输出
上述每一条输出的内容称为一个事件,多个相同的输出的内容合并到一起称为一个事件(举例:日志中连续相同的日志输出称为一个事件)! 

**Logstash会给时间添加一些额外信息,最重要的就是@timestamp,用来标记时间的发生时间。因为这个字段涉及到Logs他说的内部流传,所以必须是一个joda对象,如果你尝试自己给一个字符串

字段命名为@timestamp,Logstash会直接报错。所以,青丝用filter/data插件来管理这个特殊字段

此外大多数时候,还可以见到另外几个。

1
2
3
4
5
1、host标记时间发生在哪里

2、type标记时间的唯一类型

3、tags标记时间的某方面属性。这是一个数组,一个时间可以有多个标签。
 

Logstash格式及支持的数据类型: Logstash格式被命名为区段(section) section的格式是: input{   stdin{   }   syslog{   } } 数据类型

bool

    debug=>true

string

    host=>"hostname"

number

    ip=>127.0.0.1

array

    match=>["datetime","Unix"]

hash

    options=>{

      key1=>"value1",

      key2=>"value2"

    }

**如果版本低于1.2.0hash的写法和array是一样的

字段:



 

使用logstash将信息写入到elasticsearch

1
[root@linux-node2bin]#/opt/logstash/bin/logstash-e'input{stdin{}}output{elasticsearch{hosts=>["192.168.1.105:9200"]}}'
1
2
3
4
5
6
7
#这里定义的output就是指定es服务器的地址以及端口,也可以直接写hostname
Settings:Defaultfilterworkers:1
Logstashstartupcompleted
maliang
chuck
www.google.com
www.baidu.com




也可以本地输出,和远程发送同时进行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
[root@-node2bin]#/opt/logstash/bin/logstash-e'input{stdin{}}output{elasticsearch{hosts=>["192.168.1.105:9200"]}stdout{codec=>rubydebug}}'
Settings:Defaultfilterworkers:1
Logstashstartupcompleted
www.google.com
{
"message"=>"www.google.com",
"@version"=>"1",
"@timestamp"=>"2016-01-14T06:27:49.014Z",
"host"=>"node2"
}
www.elastic.com
{
"message"=>"www.elastic.com",
"@version"=>"1",
"@timestamp"=>"2016-01-14T06:27:58.058Z",
"host"=>"node2"
}
  



使用logstatsh读取一个配置文件,把写好的规则放在文件中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
[root@node2~]#cattest.conf
input{stdin{}}
output{
elasticsearch{hosts=>["192.168.31.105:9200"]}#发送
stdout{codec=>rubydebug}              #并且显示
}
<br><strong>[root@linux-node1~]#/opt/logstash/bin/logstash-ftest.conf</strong><br>
Settings:Defaultfilterworkers:1
Logstashstartupcompleted
123
<br>{
"message"=>"123",
"@version"=>"1",
"@timestamp"=>"2016-01-14T06:51:13.411Z",
"host"=>"lnode1
如果你是yum安装,就可以把这个位置文件放在/etc/logstash/conf.d/下面直接启动logstatsh就直接发送给es服务器了

b、学习编写conf格式

输入插件配置,此处以file为例,可以设置多个

1
2
3
4
5
6
7
8
9
10
input{
file{
path=>"/var/log/messages"
type=>"syslog"#类型
}
file{
path=>"/var/log/apache/access.log"
type=>"apache"#类型
}
}  
介绍几种收集文件的方式,可以使用数组方式或者用*匹配,也可以写多个path

1
2
path=>["/var/log/messages","/var/log/*.log"]
path=>["/data/mysql/mysql.log"]
设置boolean值

1
ssl_enable=>true
文件大小单位

1
2
3
4
my_bytes=>"1113"#1113bytes
my_bytes=>"10MiB"#10485760bytes
my_bytes=>"100kib"#102400bytes
my_bytes=>"180mb"#180000000bytes
jason收集

1
codec=>“json”
hash收集 

1
2
3
4
5
match=>{
"field1"=>"value1"
"field2"=>"value2"
...
}
端口

1
port=>33
密码

1
my_password=>"password"
c、学习编写input的file插件

  


sincedb_path:记录logstash读取位置的路径
start_postion:包括beginning和end,指定收集的位置,默认是end,从尾部开始
add_field加一个域
discover_internal发现间隔,每隔多久收集一次,默认15秒

d、学习编写output的file插件



e、通过input和output插件编写conf文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
[root@node3~]#cat/etc/logstash/conf.d/syslog.conf
input{
file{
path=>"/var/log/my_syslog"#日志地址
type=>"syslog"         #自定义类型
start_position=>"beginning" #从头开始读取日志 
}
}
output{
elasticsearch{            #输出推送给es服务器  
hosts=>["node2.gitlab.com"]#es服务器地址
index=>"system-%{+YYYY.MM.dd}"  #自定义索引  
}
}
我们不是配置了两台es吗,怎么就发给一个呢?是因为es服务器本身支持集群分片,当数据到达es服务器的时候,es服务器自己会将日志信息分散到所有其他的服务器上。

然后我们就能够在页面上看到了

  两台服务器,然后每台服务分成了5份,在浏览器中查看分片信息,一个索引默认被分成了5个分片,每份数据被分成了五个分片(可以调节分片数量),下图中外围带绿色框的为主分片,不带框的为副本分片,主分片丢失,副本分片会复制一份成为主分片,起到了高可用的作用,主副分片也可以使用负载均衡加快查询速度,但是如果主副本分片都丢失,则索引就是彻底丢失。


f、使用type来匹配类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
input{
file{
path=>"/var/log/my_syslog"
type=>"syslog"
start_position=>"beginning"
}

file{
path=>"/var/log/messages"
type=>"system"
start_position=>"beginning"
}

file{
path=>"/opt/ela/logs/my-application.log"
type=>"elk-log"
start_position=>"beginning"
}
}

output{
if[type]=="system"{
elasticsearch{
hosts=>["node2.gitlab.com"]
index=>"system-%{+YYYY.MM.dd}"
}
}

if[type]=="elk-log"{
elasticsearch{
hosts=>["node2.gitlab.com"]
index=>"elklog-%{+YYYY.MM.dd}"
}
}

if[type]=="syslog"{
elasticsearch{
hosts=>["node2.gitlab.com"]
index=>"system-%{+YYYY.MM.dd}"
}
}
}
**start_position仅在文件未被监控过的时候起作用,如果sincedb文件中已经有监控文件的inode记录了,那么Logstash依然会从记录过的pos开始读取。所以重复测试的时候每次需要删除sincedb文件。不过有一个巧妙的方法

就是把sincedb文件的位置定义在/dev/null中,这样每次重启自动从开头读取

g、把多行整个报错收集到一个事件中



以at.org开头的内容都属于同一个事件,但是显示在不同行,这样的日志格式看起来很不方便,所以需要把他们合并到一个事件中

引入codec的multiline插件

官方文档提供

1
2
3
4
5
6
7
8
9
input{
stdin{
codec=>multiline{
`pattern=>"pattern,aregexp"
negate=>"true"or"false"
what=>"previous"or"next"`
}
}
}
regrxp:使用正则,什么情况下把多行合并起来
negate:正向匹配和反向匹配
what:合并到当前行还是下一行
在标准输入和标准输出中测试以证明多行收集到一个日志成功

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
[root@linux-node1~]#catmuliline.conf
input{
stdin{
codec=>multiline{
pattern=>"^\["
negate=>true
what=>"previous"
}
}
}
output{
stdout{
codec=>"rubydebug"
}
}
[root@linux-node1~]#/opt/logstash/bin/logstash-fmuliline.conf
Settings:Defaultfilterworkers:1
Logstashstartupcompleted
[1
[2
{
"@timestamp"=>"2016-01-15T06:46:10.712Z",
"message"=>"[1",
"@version"=>"1",
"host"=>"linux-node1"
}
chuck
chuck-blog.com
123456
[3
{
"@timestamp"=>"2016-01-15T06:46:16.306Z",
"message"=>"[2\nchuck\nchuck-bloh\nchuck-blog.com\n123456",
"@version"=>"1",
"tags"=>[
[0]"multiline"
],
"host"=>"linux-node1"
继续将上述实验结果放到all.conf的es-error索引中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
[root@linux-node1~]#catall.conf
input{
file{
path=>"/var/log/messages"
type=>"system"
start_position=>"beginning"
}
file{
path=>"/var/log/elasticsearch/chuck-clueser.log"
type=>"es-error"
start_position=>"beginning"
codec=>multiline{
pattern=>"^\["
negate=>true
what=>"previous"
}
}
}
output{
if[type]=="system"{
elasticsearch{
hosts=>["192.168.56.11:9200"]
index=>"system-%{+YYYY.MM.dd}"
}
}
if[type]=="es-error"{
elasticsearch{
hosts=>["192.168.56.11:9200"]
index=>"es-error-%{+YYYY.MM.dd}"
}
}
}
Logstash使用一个名叫FileWatch的RubyGem库来监听文件变化。这个库支持glob展开文件路径,并且会记录一个叫.sincedb的数据库文件来跟踪被坚挺的日志文件的当前读取位置。

sincedb文件中记录了每个被坚挺的文件的inode,majornumber,minornumber和pos

  

h.使用log4j插件收集tomcat日志

首先在tomcat的log4j配置文件中进行修改,让日志输出到一个地方,然后使用Logstash去这个地方收集

这个地方就是一个ip+port

一般tomat中log4j的配置有两种形式,一种是log4j.properties另一种是log4j.xml文件位置:

第一种:

1
webapps/ROOT/WEB-INF/classes/log4j.properties

log4j.properties

input{
log4j{
type=>"testapi3"#日志类型
host=>"127.0.0.1"#接受的地址
port=>4990#接受的端口
}
}

output{
stdout{
codec=>rubydebug
}
}

其他参数

add_field:添加一个字段到时间中

类型hash

默认为空{}

codec:输入时的字符编码

默认为"plain"

data_timeout:超时时间

默认值为5

读超时秒。如果一个特定的TCP连接空闲时间超过这个超时周期,就认为这个任务死了,并不在监听。如果你不想超时,用-1。

host:监听地址

默认:0.0.0.0

如实是服务器的话,就监听这个。如果是客户端则连接这个地址

mode:设置是服务器还是客户端(server|client)

默认server

模式切换:服务器监听客户端的连接,客户端发送到服务器

tags

类型:array

没有设置默认值

添加任意数量的任意标签的事件。这可以帮助处理。

处理结果样式图:



kibana的配置

1
2
3
4
5
[root@node2logs]#grep'^[a-Z]'/opt/svr/kibana/config/kibana.yml
server.port:5601
server.host:"0.0.0.0"
elasticsearch.url:"http://localhost:9200"
kibana.index:".kibana"
启动

1
2
[root@node2kibana]#nohup./bin/kibana&
[1]6722
1
2
[root@node2kibana]#ss-tunlp|grep5601
tcpLISTEN0128*:5601*:*users:(("node",6722,11))
在kibana中添加一个elklog索引



点击create创建

kibana通过elklog的索引去es服务器上搜索有关日志

 


点击discover即可查看到图形界面

 


 

  
内容来自用户分享和网络整理,不保证内容的准确性,如有侵权内容,可联系管理员处理 点击这里给我发消息
标签:  ElasticSearch elk