Logstash 日志收集切割及注意事项

一、Logstash收集日志


1.Logstash的配置文件

[root@web01 ~]# vim /etc/logstash/logstash.yml
path.config: /etc/logstash/conf.d

2.logstash收集日志文件到文件

[root@web01 ~]# vim /etc/logstash/conf.d/file_file.conf
input {
  file {
    path => "/var/log/messages"
    start_position => "beginning"
  }
}
output {
  file {
    path => "/tmp/messages_%{+YYYY-MM-dd}.log"
  }
}

3.logstash收集日志文件到ES

[root@web01 ~]# vim /etc/logstash/conf.d/file_es.conf
input {
  file {
    path => "/var/log/messages"
    start_position => "beginning"
  }
}
output {
  elasticsearch {
    hosts => ["172.16.1.51:9200"]
    index => "messages_%{+YYYY-MM-dd}.log"
  }
}

4.Logstash收集多日志到文件

[root@web01 ~]# vim /etc/logstash/conf.d/file_file.conf
input {
  file {
    type => "messages_log"
    path => "/var/log/messages"
    start_position => "beginning"
  }
  file {
    type => "secure_log"
    path => "/var/log/secure"
    start_position => "beginning"
  }       
}        
output {  
  if [type] == "messages_log" { 
    file {
      path => "/tmp/messages_%{+YYYY-MM-dd}"
    }        
  }
  if [type] == "secure_log" {
    file {
      path => "/tmp/secure_%{+YYYY-MM-dd}"
    }
  } 
}

5.Logstash收集多日志到ES

1)方法一:

[root@web01 ~]# vim /etc/logstash/conf.d/more_es.conf 
input {
  file {
    type => "messages_log"
    path => "/var/log/messages"
    start_position => "beginning"
  }
  file {
    type => "secure_log"
    path => "/var/log/secure"
    start_position => "beginning"
  }
}
output {
  if [type] == "messages_log" {
    elasticsearch {
      hosts => ["10.0.0.51:9200"]
      index => "messages_%{+YYYY-MM-dd}"
    }
  }
  if [type] == "secure_log" {
    elasticsearch {
      hosts => ["10.0.0.51:9200"]
      index => "secure_%{+YYYY-MM-dd}"
    }
  }
}

[root@web01 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/more_es.conf &

#启动后查看页面

2)方法二:

[root@web01 ~]# vim /etc/logstash/conf.d/more_es_2.conf 
input {
  file {
    type => "messages_log"
    path => "/var/log/messages"
    start_position => "beginning"
  }
  file {
    type => "secure_log"
    path => "/var/log/secure"
    start_position => "beginning"
  }
}
output {
  elasticsearch {
    hosts => ["10.0.0.51:9200"]
    index => "%{type}_%{+YYYY-MM-dd}"
  }
}

[root@web01 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/more_es_2.conf --path.data=/data/logstash/more_es_2 &

3)启动多实例

#创建不同的数据目录
[root@web01 ~]# mkdir /data/logstash/more_es_2
[root@web01 ~]# mkdir /data/logstash/more_es

#启动时使用--path.data指定数据目录
[root@web01 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/more_es.conf --path.data=/data/logstash/more_es &
[root@web01 ~]# /usr/share/logstash/bin/logstash -f /etc/logstash/conf.d/more_es_2.conf --path.data=/data/logstash/more_es_2 &

#如果资源充足,可以使用多实例收集多日志,如果服务器资源不足,启动不了多实例,配置一个文件收集多日志启动

二、Logstash收集Tomcat日志

1.安装Tomcat

1.安装java环境
[root@web01 ~]# rpm -ivh jdk-8u181-linux-x64.rpm

2.上传包
[root@web01 ~]# rz apache-tomcat-10.0.0-M7.tar.gz

3.解压
[root@web01 ~]# tar xf apache-tomcat-10.0.0-M7.tar.gz -C /usr/local/

4.做软连接
[root@web01 ~]# ln -s /usr/local/apache-tomcat-10.0.0-M7 /usr/local/tomcat

5.启动Tomcat
[root@web01 ~]# /usr/local/tomcat/bin/startup.sh

6.访问页面 10.0.0.7:8080

2.配置Logstash收集Tomcat日志到文件

[root@web01 ~]# vim /etc/logstash/conf.d/tomcat_file.conf
input {
  file {
    path => "/usr/local/tomcat/logs/localhost_access_log.*.txt"
    start_position => "beginning"
  }
}
output {
  file {
    path => "/tmp/tomcat_%{+YYYY-MM-dd}.log"
  }
}

3.配置Logstash收集Tomcat日志到ES

[root@web01 ~]# vim /etc/logstash/conf.d/tomcat_es.conf
input {
  file {
    path => "/usr/local/tomcat/logs/localhost_access_log.*.txt"
    start_position => "beginning"
  }
}
output {
  elasticsearch {
    hosts => ["10.0.0.51:9200"]
    index => "tomcat_%{+YYYY-MM-dd}.log"
  }
}

三、收集Tomcat日志修改格式

#收集tomcat日志,当遇到报错时,一条报错会被分割成很多条数据,不方便查看

解决方法:
1.修改tomcat日志格式为json
    1)开发修改输出日志为json
    2)修改tomcat配置,日志格式为json
2.使用logstash的input插件下的mutiline模块

1.方法一:修改tomcat日志格式

1)配置tomcat日志为json格式

[root@web01 ~]# vim /usr/local/tomcat/conf/server.xml
#把原来的日志格式注释,添加我们的格式
<Valve className="org.apache.catalina.valves.AccessLogValve" directory="logs"
               prefix="tomcat_access_json" suffix=".log"
               pattern="{"clientip":"%h","ClientUser":"%l","authenticated":"%u","AccessTime":"%t","method":"%r","status":"%s","SendBytes":"%b","Query?string":"%q","partner":"%{Referer}i","AgentVersion":"%{User-Agent}i"}"/>

2)重启tomcat

[root@web01 ~]# /usr/local/tomcat/bin/shutdown.sh
[root@web01 ~]# /usr/local/tomcat/bin/startup.sh

3)配置收集新的tomcat日志

[root@web01 ~]# vim /etc/logstash/conf.d/tomcat_json_es.conf
input {
  file {
    path => "/usr/local/tomcat/logs/tomcat_access_json.*.log"
    start_position => "beginning"
  }
}
output {
  elasticsearch {
    hosts => ["10.0.0.51:9200"]
    index => "tomcat_json_%{+YYYY-MM-dd}.log"
  }
}

2.方法二:使用mutiline模块收集日志

1)配置收集日志测试

[root@web01 ~]# vim /etc/logstash/conf.d/test_mutiline.conf
input {
  stdin {
    codec => multiline {
      #以[开头
      pattern => "^\["
      #匹配到
      negate => true
      #向上合并,向下合并是next
      what => "previous"
    }
  }
}
output {
  stdout {
    codec => json
  }
}

#测试,输入内容不会直接输出,当遇到以 [ 开头才会收集以上的日志

2)配置收集tomcat错误日志

[root@web01 ~]# vim /etc/logstash/conf.d/tomcat_mutiline.conf 
input {
  file {
    path => "/usr/local/tomcat/logs/tomcat_access_json.*.log"
    start_position => "beginning"
    codec => multiline {
      pattern => "^\["
      negate => true
      what => "previous"
    }
  }
}

output {
  elasticsearch {
    hosts => ["10.0.0.51:9200"]
    index => "tomcat_json_%{+YYYY-MM-dd}"
    codec => "json"
  }
}

3)将错误日志写入

[root@web01 ~]# cat 1.txt >> /usr/local/tomcat/logs/tomcat_access_json.2020-08-14.log

4)页面查看数据

在message切割后的key和value处有黄色感叹号.无法制表.
需要进入如下页面处刷新fields即可.

四、收集Nginx日志

1.安装Nginx

[root@web01 ~]# yum install -y nginx

2.配置Nginx日志格式

[root@web01 ~]# vim /etc/nginx/nginx.conf
... ...
http {
    log_format  json  '{"@timestamp":"time_iso8601",'
                      '"host":"server_addr",'
                      '"clientip":"remote_addr",'
                      '"size":body_bytes_sent,'
                      '"responsetime":request_time,'
                      '"upstreamtime":"upstream_response_time",'
                      '"upstreamhost":"upstream_addr",'
                      '"http_host":"host",'
                      '"url":"uri",'
                      '"referer":"http_referer",'
                      '"agent":"http_user_agent",'
                      '"status":"status"}';

    access_log  /var/log/nginx/access.log  json;
... ...

3.配置收集Nginx日志

[root@web01 ~]# vim /etc/logstash/conf.d/nginx_json.conf
input {
  file {
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
  }
}
output {
  elasticsearch {
    hosts => ["10.0.0.51:9200"]
    index => "nginx_json_%{+YYYY-MM-dd}.log"
  }
}


五、获取的日志参数分离

1.方法一:

1)修改tomcat日志收集配置

[root@web01 ~]# vim /etc/logstash/conf.d/tomcat_json_es.conf

input {
  file {
    path => "/usr/local/tomcat/logs/tomcat_access_json.*.log"
    start_position => "beginning"
  }
}

#把收集到的数据进行处理
filter {
  json {
    source => "message"
  }
}

output {
  elasticsearch {
    hosts => ["10.0.0.51:9200"]
    index => "tomcat_json_%{+YYYY-MM-dd}.log"
  }
}

2)去掉多余数据

#message数据已经拆分,数据还在,去掉message数据
filter {
  json {
    source => "message"
    remove_field => ["message"]
  }
}

2.方法二:

1)修改收集Nginx日志的配置

#nginx不需要配置修改获取日志,只需要收集同时修改格式即可
[root@web01 ~]# vim /etc/logstash/conf.d/nginx_json.conf 
input {
  file {
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
    codec => "json"
  }
}
output {
  elasticsearch {
    hosts => ["10.0.0.51:9200"]
    index => "nginx_json_%{+YYYY-MM-dd}.log"
  }
}

六、Logstash收集日志写入redis

1.安装redis

2.配置将数据写入redis

[root@web01 ~]# vim /etc/logstash/conf.d/nginx_to_redis.conf
input {
  file {
    path => "/var/log/nginx/access.log"
    start_position => "beginning"
    codec => "json"
  }
}
output {
  redis {
    host => "172.16.1.51"
    port => "6379"
    data_type => "list"
    db => "0"
    key => "nginx_log"
  }
}

七、收集切割公司自定义的日志

很多公司的日志并不是和服务默认的日志格式一致,因此,就需要我们来进行切割了。

1、需切割的日志示例

2018-02-24 11:19:23,532 [143] DEBUG performanceTrace 1145 http://api.114995.com:8082/api/Carpool/QueryMatchRoutes 183.205.134.240 null 972533 310000 TITTL00 HUAWEI 860485038452951 3.1.146 HUAWEI 5.1 113.552344 33.332737 发送响应完成 Exception:(null)

2、切割的配置

在logstash 上,使用fifter 的grok 插件进行切割

input {
        beats {
                port => "5044"
        }
}

filter {
    grok {
        match => {
            "message" => "%{TIMESTAMP_ISO8601:timestamp} \[%{NUMBER:thread:int}\] %{DATA:level} (?<logger>[a-zA-Z]+) %{NUMBER:executeTime:int} %{URI:url} %{IP:clientip} %{USERNAME:UserName} %{NUMBER:userid:int} %{NUMBER:AreaCode:int} (?<Board>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+) (?<Brand>[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+) %{NUMBER:DeviceId:int} (?<TerminalSourceVersion>[0-9a-z\.]+) %{NUMBER:Sdk:float} %{NUMBER:Lng:float} %{NUMBER:Lat:float} (?<Exception>.*)"
        }
        remove_field => "message"
    }
    date {
                   match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
        remove_field => "timestamp"
           }
    geoip {
        source => "clientip"
        target => "geoip"
        database => "/etc/logstash/maxmind/GeoLite2-City.mmdb"
    }
}

output {
    elasticsearch {
        hosts => ["http://192.168.10.101:9200/"]
        index => "logstash-%{+YYYY.MM.dd}"
        document_type => "apache_logs"
    }
}

3、切割解析后效果

技术分享图片

4、最终kibana 展示效果

① top10 clientip

技术分享图片

② top5 url

技术分享图片

③ 根据ip 显示地理位置

技术分享图片

⑤ top10 executeTime

技术分享图片

⑥ 其他字段都可进行设置,多种图案,也可将多个图形放在一起展示

技术分享图片

八、grok 用法详解

1、简介

  Grok是迄今为止使蹩脚的、无结构的日志结构化和可查询的最好方式。Grok在解析 syslog logs、apache and other webserver logs、mysql logs等任意格式的文件上表现完美。

  Grok内置了120多种的正则表达式库,地址:https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns。

2、入门例子

① 示例

55.3.244.1 GET /index.html 15824 0.043

② 分析

  这条日志可切分为5个部分,IP(55.3.244.1)、方法(GET)、请求文件路径(/index.html)、字节数(15824)、访问时长(0.043),对这条日志的解析模式(正则表达式匹配)如下:

%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}

③ 写到filter中

filter { grok { match => { "message" => "%{IP:client} %{WORD:method} %{URIPATHPARAM:request} %{NUMBER:bytes} %{NUMBER:duration}"} } }

④ 解析后效果

client: 55.3.244.1
method: GET
request: /index.html
bytes: 15824
duration: 0.043

3、解析任意格式日志

(1)解析任意格式日志的步骤:

① 先确定日志的切分原则,也就是一条日志切分成几个部分。

② 对每一块进行分析,如果Grok中正则满足需求,直接拿来用。如果Grok中没用现成的,采用自定义模式。

③ 学会在Grok Debugger中调试。

(2)grok 的分类

  • 满足自带的grok 正则 grok_pattern

① 可以查询

# less /usr/share/logstash/vendor/bundle/jruby/1.9/gems/logstash-patterns-core-4.1.1/patterns/grok-patterns

技术分享图片

② 使用格式

grok_pattern 由零个或多个 %{SYNTAX:SEMANTIC}组成

例: %{IP:clientip}

  其中SYNTAX 是表达式的名字,是由grok提供的:例如数字表达式的名字是NUMBER,IP地址表达式的名字是IP

  SEMANTIC 表示解析出来的这个字符的名字,由自己定义,例如IP字段的名字可以是 client

  • 自定义SYNTAX

使用格式:(?the pattern here)

例:(?[0-9a-zA-Z]+[-]?[0-9a-zA-Z]+)

(3)正则解析容易出错,强烈建议使用Grok Debugger调试,姿势如下(我打开这个网页不能用)

技术分享图片

九、使用mysql 模块,收集mysql 日志

1、官方文档使用介绍

https://www.elastic.co/guide/en/beats/filebeat/current/filebeat-module-mysql.html

2、配置filebeat ,使用mysql 模块收集mysql 的慢查询

# vim filebeat.yml

#=========================== Filebeat prospectors =============================
filebeat.modules:
- module: mysql
  error:
    enabled: true
    var.paths: ["/var/log/mariadb/mariadb.log"]

  slowlog:
    enabled: true
    var.paths: ["/var/log/mariadb/mysql-slow.log"]
#----------------------------- Redis output --------------------------------
output.redis:
  hosts: ["192.168.10.102"]
  password: "ilinux.io"
  key: "httpdlogs"
  datatype: "list"
  db: 0
  timeout: 5

3、elk—logstash 切割mysql 的慢查询日志

① 切割配置

# vim mysqllogs.conf

input {
        redis {
                host => "192.168.10.102"
                port => "6379"
                password => "ilinux.io"
                data_type => "list"
                key => "httpdlogs"
                threads => 2
        }
}

filter {
        grok {
                match => { "message" => "(?m)^#\s+User@Host:\s+%{USER:user}\[[^\]]+\]\s+@\s+(?:(?<clienthost>\S*) )?\[(?:%{IPV4:clientip})?\]\s+Id:\s+%{NUMBER:row_id:int}\n#\s+Query_time:\s+%{NUMBER:query_time:float}\s+Lock_time:\s+%{NUMBER:lock_time:float}\s+Rows_sent:\s+%{NUMBER:rows_sent:int}\s+Rows_examined:\s+%{NUMBER:rows_examined:int}\n\s*(?:use %{DATA:database};\s*\n)?SET\s+timestamp=%{NUMBER:timestamp};\n\s*(?<sql>(?<action>\w+)\b.*;)\s*(?:\n#\s+Time)?.*$" }
        }
        date {
                match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                remove_field => "timestamp"
        }
}

output {
        elasticsearch {
                hosts => ["http://192.168.10.101:9200/"]
                index => "logstash-%{+YYYY.MM.dd}"
                document_type => "mysql_logs"
        }
} 

② 切割后显示结果

技术分享图片

4、kibana 最终显示效果

① 哪几个的数据库最多,例:top2 库

表无法显示,因为有些语句不涉及表,切割不出来

技术分享图片

② 哪几个sql语句出现的最多,例:top5 sql语句

技术分享图片

③ 哪几个sql语句出现的最多,例:top5 sql语句

技术分享图片

④ 哪几台服务器慢查询日志生成的最多,例:top5 服务器

技术分享图片

⑤ 哪几个用户慢查询日志生成的最多,例:top2 用户

技术分享图片

可以合并显示

技术分享图片

5、使用mysql 模块收集mysql 的慢查询

(1)filebeat 配置和上边一样

(2)elk—logstash 切割mysql 的错误日志

# vim mysqllogs.conf

filter {
        grok {
                match => { "message" => "(?<timestamp>\d{4}-\d{2}-\d{2}\s+\d{2}:\d{2}:\d{2}) %{NUMBER:pid:int} \[%{DATA:level}\] (?<content>.*)" }
        }
        date {
                match => ["timestamp","dd/MMM/YYYY:H:m:s Z"]
                remove_field => "timestamp"
        }
}

(3)就不在展示结果了

十、ELK 收集多实例日志

很多情况下,公司资金不足,不会一对一收集日志;因此,一台logstash 使用多实例收集处理多台agent 的日志很有必要。

1、filebeat 的配置

主要是output 的配置,只需不同agent 指向不同的端口即可

① agent 1 配置指向5044 端口

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["192.168.10.107:5044"]

② agent 2 配置指向5045 端口

#----------------------------- Logstash output --------------------------------
output.logstash:
  # The Logstash hosts
  hosts: ["192.168.10.107:5045"]

2、logstash 的配置

针对不同的agent ,input 指定对应的端口

① agent 1

input {
        beats {
                port => "5044"
        }
}
output {   #可以在output 加以区分
        elasticsearch {
                hosts => ["http://192.168.10.107:9200/"]
                index => "logstash-apache1-%{+YYYY.MM.dd}"
                document_type => "apache1_logs"
        }
}

② agent 1

input {
        beats {
                port => "5045"
        }
}
output {   #可以在output 加以区分
        elasticsearch {
                hosts => ["http://192.168.10.107:9200/"]
                index => "logstash-apache2-%{+YYYY.MM.dd}"
                document_type => "apache2_logs"
        }
}

开启对应的服务就ok 了。


十一、elk 注意点总结

1、编码转换问题(主要就是中文乱码)

(1)input 中的codec => plain 转码

codec => plain {
         charset => "GB2312"
}

将GB2312 的文本编码,转为UTF-8 的编码

(2)也可以在filebeat中实现编码的转换(推荐)

filebeat.prospectors:
- input_type: log
  paths:
    - c:\Users\Administrator\Desktop\performanceTrace.txt
  encoding: GB2312

2、删除多余日志中的多余行

(1)logstash filter 中drop 删除

    if ([message] =~ "^20.*-\ task\ request,.*,start\ time.*") {   #用正则需删除的多余行
            drop {}
    } 

(2)日志示例

2018-03-20 10:44:01,523 [33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-20 10:43:59   #需删除的行
-- Request String : {"UserName":"15046699923","Pwd":"ZYjyh727","DeviceType":2,"DeviceId":"PC-20170525SADY","EquipmentNo":null,"SSID":"pc","RegisterPhones":null,"AppKey":"ab09d78e3b2c40b789ddfc81674bc24deac","Version":"2.0.5.3"} -- End
-- Response String : {"ErrorCode":0,"Success":true,"ErrorMsg":null,"Result":null,"WaitInterval":30} -- End

3、grok 处理多种日志不同的行

(1)日志示例:

2018-03-20 10:44:01,523 [33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-20 10:43:59
-- Request String : {"UserName":"15046699923","Pwd":"ZYjyh727","DeviceType":2,"DeviceId":"PC-20170525SADY","EquipmentNo":null,"SSID":"pc","RegisterPhones":null,"AppKey":"ab09d78e3b2c40b789ddfc81674bc24deac","Version":"2.0.5.3"} -- End
-- Response String : {"ErrorCode":0,"Success":true,"ErrorMsg":null,"Result":null,"WaitInterval":30} -- End

(2)在logstash filter中grok 分别处理3行

match => {
    "message" => "^20.*-\ task\ request,.*,start\ time\:%{TIMESTAMP_ISO8601:RequestTime}"
match => {
    "message" => "^--\ Request\ String\ :\ \{\"UserName\":\"%{NUMBER:UserName:int}\",\"Pwd\":\"(?<Pwd>.*)\",\"DeviceType\":%{NUMBER:DeviceType:int},\"DeviceId\":\"(?<DeviceId>.*)\",\"EquipmentNo\":(?<EquipmentNo>.*),\"SSID\":(?<SSID>.*),\"RegisterPhones\":(?<RegisterPhones>.*),\"AppKey\":\"(?<AppKey>.*)\",\"Version\":\"(?<Version>.*)\"\}\ --\ \End.*"    
}
match => {
    "message" => "^--\ Response\ String\ :\ \{\"ErrorCode\":%{NUMBER:ErrorCode:int},\"Success\":(?<Success>[a-z]*),\"ErrorMsg\":(?<ErrorMsg>.*),\"Result\":(?<Result>.*),\"WaitInterval\":%{NUMBER:WaitInterval:int}\}\ --\ \End.*"
}
... 等多行

4、日志多行合并处理—multiline插件(重点)

(1)示例:

① 日志

2018-03-20 10:44:01,523 [33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-20 10:43:59
-- Request String : {"UserName":"15046699923","Pwd":"ZYjyh727","DeviceType":2,"DeviceId":"PC-20170525SADY","EquipmentNo":null,"SSID":"pc","RegisterPhones":null,"AppKey":"ab09d78e3b2c40b789ddfc81674bc24deac","Version":"2.0.5.3"} -- End
-- Response String : {"ErrorCode":0,"Success":true,"ErrorMsg":null,"Result":null,"WaitInterval":30} -- End

② logstash grok 对合并后多行的处理(合并多行后续都一样,如下)

filter {
  grok {
    match => {
      "message" => "^%{TIMESTAMP_ISO8601:InsertTime}\ .*-\ task\ request,.*,start\ time:%{TIMESTAMP_ISO8601:RequestTime}\n--\ Request\ String\ :\ \{\"UserName\":\"%{NUMBER:UserName:int}\",\"Pwd\":\"(?<Pwd>.*)\",\"DeviceType\":%{NUMBER:DeviceType:int},\"DeviceId\":\"(?<DeviceId>.*)\",\"EquipmentNo\":(?<EquipmentNo>.*),\"SSID\":(?<SSID>.*),\"RegisterPhones\":(?<RegisterPhones>.*),\"AppKey\":\"(?<AppKey>.*)\",\"Version\":\"(?<Version>.*)\"\}\ --\ \End\n--\ Response\ String\ :\ \{\"ErrorCode\":%{NUMBER:ErrorCode:int},\"Success\":(?<Success>[a-z]*),\"ErrorMsg\":(?<ErrorMsg>.*),\"Result\":(?<Result>.*),\"WaitInterval\":%{NUMBER:WaitInterval:int}\}\ --\ \End"
    }
  }
}

(2)在filebeat中使用multiline 插件(推荐)

① 介绍multiline

pattern:正则匹配从哪行合并

negate:true/false,匹配到pattern 部分开始合并,还是不配到的合并

match:after/before(需自己理解)

  after:匹配到pattern 部分后合并,注意:这种情况最后一行日志不会被匹配处理

  before:匹配到pattern 部分前合并(推荐)

② 5.5版本之后(before为例)

filebeat.prospectors:
- input_type: log
  paths:
    - /root/performanceTrace*
  fields:
    type: zidonghualog
  multiline.pattern: '.*\"WaitInterval\":.*--\ End'
  multiline.negate: true
  multiline.match: before

③ 5.5版本之前(after为例)

filebeat.prospectors:
- input_type: log 
     paths:
      - /root/performanceTrace*
      input_type: log 
      multiline:
           pattern: '^20.*'
           negate: true
           match: after

(3)在logstash input中使用multiline 插件(没有filebeat 时推荐)

① 介绍multiline

pattern:正则匹配从哪行合并

negate:true/false,匹配到pattern 部分开始合并,还是不配到的合并

what:previous/next(需自己理解)

  previous:相当于filebeat 的after

  next:相当于filebeat 的before

② 用法

input {
        file {
                path => ["/root/logs/log2"]
                start_position => "beginning"
                codec => multiline {
                        pattern => "^20.*"
                        negate => true
                        what => "previous"
                }
        }
}

(4)在logstash filter中使用multiline 插件(不推荐)

(a)不推荐的原因:

  ① filter设置multiline后,pipline worker会自动将为1

  ② 5.5 版本官方把multiline 去除了,要使用的话需下载,下载命令如下:

  /usr/share/logstash/bin/logstash-plugin install logstash-filter-multiline

(b)示例:

filter {
  multiline {
    pattern => "^20.*"
    negate => true
    what => "previous"
  }
} 

5、logstash filter 中的date使用

(1) 日志示例

2018-03-20 10:44:01 [33]DEBUG Debug - task request,task Id:1cbb72f1-a5ea-4e73-957c-6d20e9e12a7a,start time:2018-03-20 10:43:59

(2) date 使用

        date {
                match => ["InsertTime","YYYY-MM-dd HH:mm:ss "]
                remove_field => "InsertTime"
        }

注:

match => ["timestamp" ,"dd/MMM/YYYY H:m:s Z"]

  匹配这个字段,字段的格式为:日日/月月月/年年年年 时/分/秒 时区

也可以写为:match => ["timestamp","ISO8601"](推荐)

(3)date 介绍

  就是将匹配日志中时间的key 替换为@timestamp 的时间,因为@timestamp 的时间是日志送到logstash 的时间,并不是日志中真正的时间。

6、对多类日志分类处理(重点)

① 在filebeat 的配置中添加type 分类

filebeat:
  prospectors:
    -
      paths:
        #- /mnt/data/WebApiDebugLog.txt*
        - /mnt/data_total/WebApiDebugLog.txt*
      fields:
        type: WebApiDebugLog_total
    -
      paths:
        - /mnt/data_request/WebApiDebugLog.txt*
        #- /mnt/data/WebApiDebugLog.txt*
      fields:
        type: WebApiDebugLog_request
    -
      paths:
        - /mnt/data_report/WebApiDebugLog.txt*
        #- /mnt/data/WebApiDebugLog.txt*
      fields:
        type: WebApiDebugLog_report

② 在logstash filter中使用if,可进行对不同类进行不同处理

filter {
   if [fields][type] == "WebApiDebugLog_request" {   #对request 类日志
        if ([message] =~ "^20.*-\ task\ report,.*,start\ time.*") {   #删除report 行
                drop {}
        }
    grok {
        match => {"... ..."}
        }
}

③ 在logstash output中使用if

if [fields][type] == "WebApiDebugLog_total" {
    elasticsearch {
        hosts => ["6.6.6.6:9200"]
        index => "logstashl-WebApiDebugLog_total-%{+YYYY.MM.dd}"
        document_type => "WebApiDebugLog_total_logs"
} 
Copyright © 2009 - Now . XPBag.com . All rights Reserved.
夜心的小站 » Logstash 日志收集切割及注意事项

提供最优质的资源集合

立即查看 了解详情