ELK-Elasticsearch,Logstash,kibana搭建基于日志文件的日志分析系统

wylc123 8月前 ⋅ 327 阅读

1.系统环境

Windows 10,elasticsearch-7.2.0 ,logstash-7.2.0,kibana-7.2.0-windows-x86_64,jdk 11, nodejs 12..4.0

2. 安装配置Elasticsearch

2.1 Elasticsearch下载

历史版本下载地址:https://www.elastic.co/cn/downloads/past-releases#elasticsearch

2.2 配置启动

1)config目录修改配置文件elasticsearch.yml

network.host: 192.168.0.1 #修改可以访问的ip

http.port: 9200 #设置访问端口

2)bin目录,运行elasticsearch.bat启动

2.3 成功验证

启动完成后浏览器访问,http://localhost:9200或自己配置的ip:端口,返回如下:则配置启动成功。

3. 安装配置kibana

3.1 kibana简介

Kibana 是一个设计出来用于和 Elasticsearch 一起使用的开源的分析与可视化平台,可以用 kibana 搜索、查看、交互存放在Elasticsearch 索引里的数据,使用各种不同的图表、表格、地图等展示高级数据分析与可视化,基于浏览器的接口使你能快速创建和分享实时展现Elasticsearch查询变化的动态仪表盘,让大量数据变得简单,容易理解。

3.2 kibana下载

历史版本下载地址:https://www.elastic.co/cn/downloads/past-releases#kibana

3.3 安装条件
  1. 保证安装了JDK
  2. 保证安装node
  3. 保证安装了Elasticsearch,Elasticsearch版本要与kibana相同
3.4 配置启动

1)config目录修改配置文件kibana.yml

server.port: 5601 #kibana访问端口

server.host: "10.140.128.220" #kibana访问地址

elasticsearch.hosts: ["http://localhost:9200"] #配置elasticsearch访问地址

elasticsearch.requestTimeout: 90000 #如果有超时报错可以将这个配置改大

i18n.locale: "zh-CN" #界面国际化配置

2)bin目录,运行kibana.bat启动

3.5 成功验证

启动完成后浏览器访问,http://localhost:5601或自己配置的ip:端口,返回如下:

3.6 使用NSSM将Kibana安装为Windows服务

1.下载NSSM:http://www.nssm.cc/download,解压后目录结构如下:

2.命令行执行安装操作: nssm install kibana

会弹出下面的窗口

3.关联Kibana的批处理的启动文件

4.启动刚刚安装好的Kibana服务: nssm start kibana

4. 安装配置Logstash

4.1 下载Logstash

历史版本下载地址:https://www.elastic.co/cn/downloads/past-releases#logstash

4.2 Logstash简介

Logstash 是开源的服务器端数据处理管道, 能够动态地采集、转换和传输数据,不受格式或复杂度的影响。利用 Grok 从非结构化数据中派生出结构。

4.3 Logstash原理

Logstah处理事件有三个阶段:input、filter、output。

1. input
产生事件,常见的input有file、beats。
2. filter
结合条件语句对符合标准的事件进行处理。

  • grok:解析和结构化任何文本。Grok目前是logstash最好的方式对非结构化日志数据解析
    成结构化和可查询化。
  • mutate:在事件字段执行一般的转换。可以重命名、删除、替换和修改事件字段。
  • drop:完全丢弃事件,如debug事件
  • clone:复制事件,可能添加或者删除字段。
  • geoip:添加有关IP地理位置信息。

3. output
output是logstash管道的最后一个阶段,一个事件可以经过多个output。但是一旦所有输出
处理完,该事件已经执行完。常用的output有elasticsearch、file。

4.4 配置启动

一定要在logstash的/bin目录下,新建一个配置文件logstash_default.conf。配置文件可以参考logstash/config目录下的logstash-sample.conf。

配置文件格式:

input {
  beats {
    port => 5044
  }
}

filter {
	if "java-logs" in [tags]{
		grok{
			#筛选过滤
			match => {
				"message" => "(?<date>\d{4}-\d{2}-\d{2}\s\d{2}:\d{2}:\d{2},\d{3})\]\[(?<level>[A-Z]{4,5})\]\[(?<thread>[A-Za-z0-9/-]{4,40})\]\[(?<class>[A-Za-z0-9/.]{4,40})\]\[(?<msg>.*)"
			}
			remove_field => ["message"]
		}
		#不匹配正则则删除,匹配正则则用=~
		if [level] !~ "(ERROR|WARN|INFO)" {
		# 删除日志
		drop {}
		}
	}
}

output {
		elasticsearch {
		hosts => "localhost:9200"
	}
}

logstash配置文件详解

1. Input

input {
	path => "D:\input\a\*.log"
	exclude => "*.gz"
	start_position => "beginning"
	ignore_older => 0
	sincedb_path => "D:\input\record"
	add_field => {"test"="test"}
	type => "apache-log"
}
path:数组类型,读取文件的路径。

exclude:数组类型,排除不想监听的文件规则。

sincedb_path:字符串类型,记录sincedb文件路径,logstash会通过sincedb文件来跟踪记录每个文件中的当前位置,停止和重新启动logstash,logstash会从记录位置继续读取文件。

start_position: 字符串类型,可以配置为beginning/end,是否从头读取文件

stat_interval: 数值类型,定时检查文件是否有更新,默认是1秒

discover_interval: 数值类型,定时检查是否有新文件待读取,默认是15秒

ignore_older: 数值类型,扫描文件列表时,如果该文件上次更改时间超过设定的时长,则不做处理,但依然会监控是否有新内容,默认关闭

close_older: 数值类型,如果监听的文件在超过该设定时间内没有新内容,会被关闭文件句柄,释放资源,但依然会监控是否有新内容,默认3600秒,即1小时

add_field: 增加一个字段

type: 表明导入的日志类型

2. Codec

Codec作用于Input和Output,负责将数据在原始与Logstash Event之间转换,常见的codec有:

  1. plain: 读取原始内容
  2. dots: 将内容简化为点进行输出
  3. rubydebug: 将Logstash Events按照ruby格式输出,方便调试
  4. line: 处理带有换行符的内容
  5. json: 处理json格式的内容
  6. multiline: 处理多行数据的内容,当一个事件的message由多行组成时,需要使用multiline,常见的情况是处理日志信息。
input {
   file {
   	path => "D:\input\a\*.log"
   	start_position => "beginning"
   	codec => multiline {
   		pattern => "^\s"   
   		what => "previous"			
   	}
   }
}
pattern: 设置行匹配的正则表达式,可以使用grok
what: 可设置为previous或next。表示当与pattern匹配成功的时候,匹配行是归属上一个事件还是下一个事件
negate: 布尔值,表示是否对pattern的结果取反 

3. Filter

Filter可以对Logstash事件进行丰富的处理,比如解析数据、删除字段、类型转换等等

启动logstash

logstash -f logstash.conf

4.5 成功验证

启动成功如图所示。默认端口为9600。在浏览器中输入localhost:9600即可查看效果。

 


相关文章推荐

全部评论: 0

    我有话说: