logstash grok配置规则

logstash grok配置规则

logstash.conf

这里主要需要配置grok match,把日志信息切分成索引数据(match本质是一个正则匹配)

日志原文:

1
2018-04-13 16:03:49.822 INFO  o.n.p.j.c.XXXXX - Star Calculator

grok match:

1
 match => { "message" => "%{DATA:log_date} %{TIME:log_localtime} %{WORD:log_type} %{JAVAFILE:log_file} - %{GREEDYDATA:log_content}"}

切出来的数据

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
{
  "log_date": [
    [
      "2018-04-13"
    ]
  ],
  "log_localtime": [
    [
      "16:03:49.822"
    ]
  ],
  "HOUR": [
    [
      "16"
    ]
  ],
  "MINUTE": [
    [
      "03"
    ]
  ],
  "SECOND": [
    [
      "49.822"
    ]
  ],
  "log_type": [
    [
      "INFO"
    ]
  ],
  "log_file": [
    [
      "o.n.p.j.c.XXXX"
    ]
  ],
  "log_content": [
    [
      "Star Calculator"
    ]
  ]
}

上面所有切出来的field都是es中mapping index,都可以在用来做条件查询.

grokdebug.herokuapp.com里面可以做测试.

grokdebug.herokuapp.com/patterns 所有可用的patterns都可以在这里查到.

现在我们在用的配置见/logstash/logstash-k8s.conf

Q: 需要指定mapping index的数据类型怎么办?

A: grok match本质是一个正则匹配,默认出来的数据都是String.有些时候我们知道某个值其实是个数据类型,这时候可以直接指定数据类型. 不过match中仅支持直接转换成int ,float,语法是 %{NUMBER:response_time:int} 完整配置:

1
2
match => {
            "message" => "%{DATA:log_date} %{TIME:log_localtime} %{WORD:log_type}  %{JAVAFILE:log_file} - %{WORD:method} %{URIPATHPARAM:uri} %{NUMBER:status:int} %{NUMBER:size:int} %{NUMBER:response_time:int}"}

Q: 索引文件想需要按日期分别存放,怎么办?

A: out中指定index格式,如 index=> “k8s-%{+YYYY.MM.dd}”

完整out如下:

1
2
3
4
5
6
7
output {
    elasticsearch {
      hosts => "${ES_URL}"
      manage_template => false
      index => "k8s-%{+YYYY.MM.dd}"
      }
  }

完整logstash.conf

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
input {
    beats {
      host => "0.0.0.0"
      port => 5043
    }
  }
  filter {
    if [type] == "kube-logs" {
      mutate {
        rename => ["log", "message"]
      }
      date {
        match => ["time", "ISO8601"]
        remove_field => ["time"]
      }
      grok {
          match => {
            "source" => "/var/log/containers/%{DATA:pod_name}_%{DATA:namespace}_%{GREEDYDATA:container_name}-%{DATA:container_id}.log"}
          match => {
            "message" => "%{DATA:log_date} %{TIME:log_localtime} %{WORD:log_type}  %{JAVAFILE:log_file} - %{WORD:method} %{URIPATHPARAM:uri} %{NUMBER:status:int} %{NUMBER:size:int} %{NUMBER:response_time:int}"}
          remove_field => ["source"]
          break_on_match => false
      }
    }
  }
  output {
    elasticsearch {
      hosts => "${ES_URL}"
      manage_template => false
      index => "k8s-%{+YYYY.MM.dd}"
      }
  }