logstash

### 重量级  300多m  建议使用filebeat轻量级

image-20241007151345803

image-20241007151749456

安装
基于rpm方式安装logstash:
(1)下载软件包
wget ogstash-7.17.5-x86_64.rpm

(2)安装logstash
rpm -ivh logstash-7.17.5-x86_64.rpm

(3)验证logstash版本
ln -svf /usr/share/logstash/bin/logstash /usr/local/sbin
logstash -V

(4)基于命令行启动logstash实例
logstash -e "input { stdin { type => stdin } } output { stdout { codec => rubydebug } }"

(5)测试logstash
自行输入数据即可。



基于二进制方式安装logstash:
(1)下载软件包
wget logstash-7.17.5-linux-x86_64.tar.gz


(2)解压软件包
tar xf logstash-7.17.5-linux-x86_64.tar.gz -C /app

(3)验证logstash版本
ln -svf /app/logstash-7.17.5/bin/logstash /usr/local/sbin/
logstash -V

(4)基于命令行启动logstash实例
logstash -e "input { stdin { type => stdin } } output { stdout {} }"

(5)测试logstash
自行输入数据即可。
logstash -e "input { stdin { type => stdin } } output { stdout { codec => rubydebug } }"

image-20241007151240524


编写文件测试

### vi /app/logstash/conf/001.conf
input { 
  stdin { type => stdin } 
} 

output { 
  stdout {} 
}

[root@elk101 ~]# logstash -f /app/logstash/conf/001.conf

image-20241007152223737

结合filebeat 输入到es

------------------------ logstash ------------
vi /app/logstash/conf/filebeat-es001.conf
input { 
  # 指定输入的类型是一个beats
  beats {
    # 指定监听的端口号
    port => 8888
  }
} 

output { 
  # 将数据在标准输出显示
  stdout {} 
  # 将数据写入ES集群
  elasticsearch {
    # 指定ES主机地址
    hosts => ["http://localhost:9200"]
    # 指定索引名称
    index => "logstash"
  }
}

logstash -f /app/logstash/conf/filebeat-es001.conf 

------------------------ filebeat ------------------------
vim output_port.yaml
filebeat.inputs:
- type: log
  paths:
    - /var/log/nginx/access.log*

# 将数据输出到logstash中
output.logstash:
  # 指定logstash的主机和端口
  hosts: ["localhost:8888"]



-------------------------------------------
echo 666666666666 >> /var/log/nginx/access.log 

image-20241007153212102

image-20241007153253294


判断nginx中的ip来源信息(geoip)实战

vim /app/logstash/conf/geoip.yaml
input { 
  # 指定输入的类型是一个beats
  beats {
    # 指定监听的端口号
    port => 8888
  }
} 

filter {
  # 根据IP地址分析客户端的经纬度,国家,城市信息等。
  geoip {
     source => "clientip"
     ### 排除不需要的字段
     remove_field => [ "agent","log","input","host","ecs","tags" ]
  }

}
output { 
  # 将数据在标准输出显示
  stdout {} 
  # 将数据写入ES集群
  elasticsearch {
    # 指定ES主机地址
    hosts => ["http://localhost:9200"]
    # 指定索引名称
    index => "logstash-ip"
  }
}

[root@elk101 ~]# logstash -rf config/02-beats-to-stdout.conf 
######## filebeat 
filebeat.inputs:
- type: log
  paths:
    - /var/log/nginx/access.log*
  json.keys_under_root: true
  json.add_error_key: true


# 将数据输出到logstash中
output.logstash:
  # 指定logstash的主机和端口
  hosts: ["localhost:8888"]

image-20241007155300672

image-20241007155536130

将日志实际写入时间更正案例(grok)(date)

## 对默认的nginx 日志 进行分析转换
(1)logstash配置文件
cat beats-grok_geoip_date-es.conf
input { 
  beats {
    port => 8888
  }
} 

filter {
   grok {
      match => { "message" => "%{HTTPD_COMBINEDLOG}" }
      remove_field => [ "agent","log","input","host","ecs","tags" ]
   }

  geoip {
     source => "clientip"
  }

  date {
      # 匹配时间字符串字段并格式化
      # "22/Nov/2015:11:57:34 +0800"
      match => [ "timestamp", "dd/MMM/yyyy:HH:mm:ss Z" ]
      # 匹配时区
      timezone => "Asia/Shanghai"
      # 将转后的日期替换为指定字段,若不指定,则默认值为"@timestamp"
      target => "date"
  }
}

output { 
 #stdout {} 
  
 elasticsearch {
   hosts => ["http://localhost:9200"]
   index => "logstash-nginx-date"
 }
}

logstash -rf config/05-beats-grok_geoip_date-es.conf



(2)filebeat配置文件
cat config/nginx-to-logstash.yaml 
filebeat.inputs:
- type: log
  paths:
    - /tmp/aaaa/access.log


# 将数据输出到logstash中
output.logstash:
  # 指定logstash的主机和端口
  hosts: ["10.0.0.101:8888"]

filebeat -e -c config/19-nginx-to-logstash.yaml