Logstash 的自定义正则匹配
##### 测试数据
QINGHUAedu2024 教室07
[root@elk101 zz]# cat 001.conf
YEAR [\d]{4}
CLASSROOMNUMBER [0-9]{2}
TEACHER [A-Z]+
vim zz.conf
input {
tcp {
port => 9999
type => "tcp"
}
}
filter {
grok {
# 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
patterns_dir => ["/app/logstash/conf/zz/001.conf"]
# 基于指定字段进行匹配,修改正则模式以匹配 "teacher" 和 "year"
match => { "message" => "%{TEACHER:teacher}.{3}%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}" }
# 添加自定义字段
add_field => {"custom-type" => "jiaoshi07-tcp"}
}
# 处理可能的解析失败
if "_grokparsefailure" in [tags] {
mutate {
add_field => {"parse_failure" => "true"}
}
}
}
output {
stdout {}
}
#### 进行测试
[root@elk101 conf]# echo QINGHUAedu2024 教室07 | nc 127.0.0.1 9999
logstash 的单分支和多分支
单个分支
input {
beats {
port => 8888
type => "beats"
}
tcp {
port => 9999
type => "tcp"
}
http {
type => "http"
}
}
filter {
if [type] == "beats" {
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" }
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
geoip {
source => "clientip"
add_field => {"custom-type" => "jiaoshi07-beats"}
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
timezone => "Asia/Shanghai"
target => "oldboyedu-linux85-date"
}
}
if [type] == "tcp" {
grok {
# 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
patterns_dir => ["/app/logstash/conf/zz/001.conf"]
# 基于指定字段进行匹配
# match => { "message" => "%{TEACHER:teacher}edu%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
match => { "message" => "%{TEACHER:teacher}.{3}%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
add_field => {"custom-type" => "jiaoshi07-tcp"}
}
}else {
mutate {
add_field => {
"school" => "清华"
"class" => "linux"
"custom-type" => "jiaoshi07-http"
}
}
}
}
output {
stdout {}
# elasticsearch {
# hosts => ["http://localhost:9200"]
# index => "oldboyedu-linux85-logstash-nginx"
# }
}
echo QINGHUAedu2024 教室07 | nc 127.0.0.1 9999
#### 测试 http
### 测试nginx 日志
filebeat.inputs:
# 指定输入类型是log
- type: log
# 指定文件路径
paths:
- /tmp/aaa/*.log
- /tmp/aaa/*/*.json
# 注意,两个*可以递归匹配
- /tmp/aaa/**/*.exe
output.logstash:
# 指定logstash的主机和端口
hosts: ["localhost:8888"]
------------------------------
解析了ip
定义了时间
分析了nginx 日志 (利用默认的模板)
多个分支
input {
beats {
port => 8888
type => "beats"
}
tcp {
port => 9999
type => "tcp"
}
http {
type => "http"
}
}
filter {
if [type] == "beats" {
grok {
match => { "message" => "%{HTTPD_COMBINEDLOG}" }
remove_field => [ "agent","log","input","host","ecs","tags" ]
}
geoip {
source => "clientip"
add_field => {"custom-type" => "jiaoshi07-beats"}
}
date {
match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
timezone => "Asia/Shanghai"
# target 参数指定了解析后的日期应该被存储在哪个新字段中
target => "date"
}
} else if [type] == "tcp" {
grok {
# 指定加载pattern匹配模式的目录,可以是相对路径,也可以是绝对路径
patterns_dir => ["/app/logstash/conf/zz/001.conf"]
# 基于指定字段进行匹配
# match => { "message" => "%{TEACHER:teacher}edu%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
match => { "message" => "%{TEACHER:teacher}.{3}%{YEAR:year} 教室%{CLASSROOMNUMBER:classroom_number}"}
add_field => {"custom-type" => "jiaoshi07-tcp"}
}
}else {
mutate {
add_field => {
"school" => "清华"
"class" => "linux"
"custom-type" => "jiaoshi07-http"
}
}
}
}
output {
stdout {}
# elasticsearch {
# hosts => ["http://localhost:9200"]
# index => "oldboyedu-linux85-logstash-nginx"
# }
}