Logstash概念与原理

Logstash概念

Logstash是一款开源的数据收集引擎,具备实时管道处理能力。简单来说,logstash作为数据源与数据存储分析工具之间的桥梁,结合ElasticSearch以及Kibana,能够极大方便数据的处理与分析。通过200多个插件,logstash可以接受几乎各种各样的数据。包括日志、网络请求、关系型数据库、传感器或物联网等等。

Logstash工作过程

如上图,Logstash的数据处理过程主要包括:Inputs,Filters,Outputs 三部分,另外在Inputs和Outputs中可以使用Codecs对数据格式进行处理。这四个部分均以插件形式存在,用户通过定义pipeline配置文件,设置需要使用的input,filter,output,codec插件,以实现特定的数据采集,数据处理,数据输出等功能 。

  1. Inputs:用于从数据源获取数据,常见的插件如file, syslog, redis, beats 等
  2. Filters:用于处理数据如格式转换,数据派生等,常见的插件如grok, mutate, drop, clone, geoip等
  3. Outputs:用于数据输出,常见的插件如elastcisearch,file, graphite, statsd等
  4. Codecs:Codecs(编码插件)不是一个单独的流程,而是在输入和输出等插件中用于数据转换的模块,用于对数据进行编码处理,常见的插件如json,multiline。Logstash不只是一个input | filter | output 的数据流,而是一个 input | decode | filter | encode | output 的数据流!codec 就是用来 decode、encode 事件的。
     

Logstash简单实践

我们使用Logstash输出一个 “hello world” 。在终端中,像下面这样运行命令来启动 Logstash 进程:

 # bin/logstash -e 'input{stdin{}}output{stdout{codec=>rubydebug}}'

以上命令表示从控制台输入,然后通过Codec插件从控制台输出。然后终端在等待你的输入。敲入 Hello World,回车,查看结果:

{
	"@version" => "1",
	"host" => "sdn-253",
	"message" => "Hello World",
	"@timestamp" => 2019-07-01T12:28:07.207Z
}

Logstash 就像管道符一样!你输入(就像命令行的 cat )数据,然后处理过滤(就像 awk 或者 uniq之类)数据,最后输出(就像 tee )到其他地方。数据在线程之间以 事件 的形式流传。Logstash会给事件添加一些额外信息。最重要的就是 @timestamp,用来标记事件的发生时间。 大多数时候,还可以见到另外几个:

  • host 标记事件发生在哪里。
  • type 标记事件的唯一类型。
  • tags 标记事件的某方面属性。这是一个数组,一个事件可以有多个标签。
    你可以随意给事件添加字段或者从事件里删除字段。

注意:每个 logstash 过滤插件,都会有四个方法叫 add_tag, remove_tag, add_field 和remove_field。它们在插件过滤匹配成功时生效。

Logstash配置语法

数据类型

Logstash 支持少量的数据值类型:
bool

debug => true

string

host => "hostname"

number

port => 514

array

match => ["datetime", "UNIX", "ISO8601"]

hash

options => {
    key1 => "value1",
    key2 => "value2"
}

条件判断

表达式支持下面这些操作符:

相等: ==, !=, <, >, <=, >=

正则: =~(匹配正则), !~(不匹配正则)

包含: in(包含), not in(不包含)

布尔操作: and(与), or(或), nand(非与), xor(非或)

一元运算符:!(取反) ,()(复合表达式), !()(对复合表达式结果取反)

通常来说,你都会在表达式里用到字段引用。比如:

if "_grokparsefailure" not in [tags] {
	...
} else if [status] !~ /^2\d\d/ and [url] == "/noc.gif" {
	...
} else {
	...
}

Logstash插件

logstash插件功能很强大,下面会根据每个模块的情况,对常用插件进行分析。

Input模块——标准输入

我们已经使用 stdin 输入Hello World了。这也应该是 logstash 里最简单和基础的插件了。 input { stdin { } }表示从控制台输入

File插件

从文件读取数据,如常见的日志文件。文件读取通常要解决几个问题:

logstash-input-file配置:

注意:

  • 其中path匹配规则如下,路径必须使用绝对路径,不支持相对路径:
  • /var/log/.log:匹配/var/log目录下以.log结尾的所有文件
  • /var/log/**/.log:匹配/var/log所有子目录下以.log结尾的文件
  • /var/log/{app1,app2,app3}/*.log:匹配/var/log目录下app1,app2,app3子目录中以.log结尾的文件
     

file插件作为input例子如下:

input {
    # file为常用文件插件,插件内选项很多,可根据需求自行判断
    file {
	   # 要导入的文件的位置,可以使用*,例如/var/log/nginx/*.log
        path => "/var/lib/mysql/slow.log"
        # 要排除的文件
        exclude =>”*.gz”
        # 从文件开始的位置开始读,end表示从结尾开始读
        start_position => "beginning"
        # 多久之内没修改过的文件不读取,0为无限制,单位为秒
        ignore_older => 0  
        # 记录文件上次读取位置,输出到null表示每次都从文件首行开始解析
        sincedb_path => "/dev/null"
        # type字段,可表明导入的日志类型
        type => "mysql-slow"
    }
}

Http插件

input {
	http { port => 端口号 }
}

Redis插件

input {
    # redis插件为常用插件,插件内选项很多,可根据需求自行判断
    redis {
        # EVAL命令返回的事件数目,设置为5表示一次请求返回5条日志信息
	   batch_count => 1 
        # logstash redis插件工作方式
        data_type => "list" 
        # 监听的键值
        key => "logstash-test-list" 
        # redis地址
        host => "127.0.0.1" 
        # redis端口号
        port => 6379 
        # 如果有安全认证,此项为认证密码
        password => "123qwe" 
        # 如果应用使用了不同的数据库,此为redis数据库的编号,默认为0。
        db => 0 
        # 启用线程数量
        threads => 1
      }
}

Filter模块

Filter是Logstash功能强大的主要原因,它可以对Logstash Event进行丰富的处理,比如解析数据、删除字段、类型转换等等,常见的有如下几个:

Date插件

date插件可以将日期字符串解析为日期类型,然后替换@timestamp字段或者指定其他字段:

filter{
	date {
	    match => ["timestamp","dd/MMM/yyyy:HH:mm:ss Z"] 
         # 记录@timestamp时间,可以设置日志中自定的时间字段,如果日志中没有时间字段,也可以自己生成
         target=>“@timestamp”
         # 将匹配的timestamp字段放在指定的字段 默认是@timestamp
    }
}

Grok插件

grok是filter最重要的插件,grok使用正则表达式来生成grok语法,grok支持许多默认的正则表达式规则,grok中常用patterns的配置路径:

[logstash安装路径]\vendor\bundle\jruby\x.x\gems\logstash-patterns-core-x.x.x\patterns\grok-patterns

grok语法

%{SYNTAX:SEMANTIC}

SYNTAX为grok pattern的名称,SEMANTIC为赋值字段名称。%{NUMBER:duration}可以匹配数值类型,但是grok匹配出的内容都是字符串类型,可以通过在最后指定为int或者float来强转类型:%{NUMBER:duration:int}

自定义正则表达式
例如,如下定义一个关键字为version的参数,内容为两位的数字。

(?<version>[0-9]{2})

自定义grok pattern
我们通过pattern_definitions参数,以键值对的方式定义pattern名称和内容。也可以通过pattern_dir参数,以文件的形式读取pattern。

filter {
	grok {
		match => {
			"message" => "%{SERVICE:service}"
		}
		pattern_definitions => {
			"SERVICE" => "[a-z0-9]{10,11}"
		}
	}
}

Dissect插件

基于分隔符原理解析数据,解决grok解析时消耗过多cpu资源的问题。dissect语法简单,能处理的场景比较有限。它只能处理格式相似,且有分隔符的字符串。它的语法如下:

  1. %{}里面是字段
  2. 两个%{}之间是分隔符。

例如,有以下日志:

Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool

我想要把前面的日期和时间解析到同一个字段中,那么就可以这样来做:

filter {
    dissect {
        mapping => {
        	"message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}"
        }
    }
}

Mutate插件

mutate是使用最频繁的插件,可以对字段进行各种操作,比如重命名、删除、替换、更新等,主要操作如下:

1、convert类型转换

2、gsub字符串替换

3、split、join、merge字符串切割、数组合并为字符串、数组合并为数组

4、rename字段重命名

5、update、replace字段内容更新或替换。它们都可以更新字段的内容,区别在于update只在字段存在时生效,而replace在字段不存在时会执行新增字段的操作

6、remove_field删除字段
 

Json插件

将字段内容为json格式的数据解析出来,如果不指定target的话,那么filter会把解析出来的json数据直接放到根级别。配置实例如下:

filter {
	json {
		source => "message"
		target => "msg_json"
	}
}

运行结果:

{
    "@version": "1",
    "@timestamp": "2014-11-18T08:11:33.000Z",
    "host": "web121.mweibo.tc.sinanode.com",
    "message": "{\"uid\":3081609001,\"type\":\"signal\"}",
    "jsoncontent": {
        "uid": 3081609001,
        "type": "signal"
    }
}

Geoip插件

GeoIP 库可以根据 IP 地址提供对应的地域信息,包括国别,省市,经纬度等,对于可视化地图和区域统计非常有用。语法如下:

filter {
	geoip {
		source => "message"
	}
}

运行结果:

{
       "message" => "183.60.92.253",
      "@version" => "1",
    "@timestamp" => "2014-08-07T10:32:55.610Z",
          "host" => "raochenlindeMacBook-Air.local",
         "geoip" => {
                      "ip" => "183.60.92.253",
           "country_code2" => "CN",
           "country_code3" => "CHN",
            "country_name" => "China",
          "continent_code" => "AS",
             "region_name" => "30",
               "city_name" => "Guangzhou",
                "latitude" => 23.11670000000001,
               "longitude" => 113.25,
                "timezone" => "Asia/Chongqing",
        "real_region_name" => "Guangdong",
                "location" => [
            [0] 113.25,
            [1] 23.11670000000001
        ]
    }
}

Output模块

标准输出

标准输出多用于调试,配置示例:

output {
    stdout {
        codec => rubydebug
    }
}

redis插件

output {
     redis{  # 输出到redis的插件,下面选项根据需求使用
         batch => true
         # 设为false,一次rpush,发一条数据,true为发送一批
         batch_events => 50
         # 一次rpush发送多少数据
         batch_timeout => 5
         # 一次rpush消耗多少时间
         codec => plain
         # 对输出数据进行codec,避免使用logstash的separate filter
         congestion_interval => 1
         # 多长时间进项一次拥塞检查
         congestion_threshold => 5
         # 限制一个list中可以存在多少个item,当数量足够时,就会阻塞直到有其他消费者消费list中的数据
         data_type => list
         # 使用list还是publish
         db => 0
         # 使用redis的那个数据库,默认为0号
         host => ["127.0.0.1:6379"]
         # redis 的地址和端口,会覆盖全局端口
         key => xxx
         # list或channel的名字
         password => xxx
         # redis的密码,默认不使用
         port => 6379
         # 全局端口,默认6379,如果host已指定,本条失效
         reconnect_interval => 1
         # 失败重连的间隔,默认为1s
         timeout => 5
         # 连接超时的时间
         workers => 1
         # 工作进程
     }
}

elasticsearch插件

output {
    # stdout { codec => "rubydebug" }
    # 筛选过滤后的内容输出到终端显示
    elasticsearch {  # 导出到es,最常用的插件
        codec => "json"
        # 导出格式为json
        hosts => ["127.0.0.1:9200"]
        # ES地址+端口
        index => "logstash-slow-%{+YYYY.MM.dd}"
        # 导出到index内,可以使用时间变量
        user => "admin"
        password => "xxxxxx"
        # ES如果有安全认证就使用账号密码验证,无安全认证就不需要
        flush_size => 500
        # 默认500,logstash一次性攒够500条的数据在向es发送
        idle_flush_time => 1
        # 默认1s,如果1s内没攒够500,还是会一次性把数据发给ES
    }   
}

Logstash配置实例

logstash配置的时候,input和output都可以配置多个不同的入参。filter可以针对input里面的每个数据源做不一样的过滤,通过各自定义的type来匹配。配置示例如下:

input{
      kafka{
        bootstrap_servers => ["192.168.110.31:9092,192.168.110.31:9093,192.168.110.31:9094"]
        client_id => "test"
        group_id => "test"
        auto_offset_reset => "latest" //从最新的偏移量开始消费
        consumer_threads => 5
        decorate_events => true //此属性会将当前topic、offset、group、partition等信息也带到message中
        topics => ["logq","loge"] //数组类型,可配置多个topic
        type => "bhy" //所有插件通用属性,尤其在input里面配置多个数据源时很有用
      }
	file {
	   # 要导入的文件的位置,可以使用*,例如/var/log/nginx/*.log
        path => "/var/lib/mysql/slow.log"
        # 记录文件上次读取位置,输出到null表示每次都从文件首行开始解析
        sincedb_path => "/dev/null"
        # type字段,可表明导入的日志类型
        type => "mysql-slow"
    }
}
filter{
        if[type] == "bhy"{
            grok{
               ........
            }
        }
        if[type] == "mysql-slow"{
            mutate{
               ........
            }
        }
}
output {
        if[type] == "bhy"{
          elasticsearch{
               hosts => ["192.168.110.31:9200"]
               index => "school"
               timeout => 300
               user => "elastic"
               password => "changeme"
          }

        }
        if[type] == "mysql-slow"{
            ........
        }
 }

1、针对如下类型的log:

Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool

logstash的配置如下:

input {
    file {
        path => "/home/songfeihu/logstash-6.2.3/config/test.log"
        # 要导入的文件的位置,可以使用*,例如/var/log/nginx/*.log
        start_position => "beginning"
        # 从文件开始的位置开始读,end表示从结尾开始读
        ignore_older => 0
        # 多久之内没修改过的文件不读取,0为无限制,单位为秒
        sincedb_path => "/dev/null"
        # 记录文件上次读取位置,输出到null表示每次都从文件首行开始解析
    }
}
filter {
        dissect {
                mapping => {
                        "message" => "%{ts} %{+ts} %{+ts} %{src} %{prog}[%{pid}]: %{msg}"
                }
        }
        if "Starting" in [msg]{
                grok{
                        match => {"msg" => "(?<test1>[a-zA-Z0-9]+).*"}
                }
        }
       mutate {
               remove_field => ["message"]
       }
}
output {
	stdout{codec=>rubydebug}
}

output返回值:

{
          "host" => "sdn-253",
      "@version" => "1",
    "@timestamp" => 2019-06-28T08:08:58.062Z,
           "msg" => "Starting system activity accounting tool",
         "test1" => "Starting",
            "ts" => "Apr 26 12:20:02",
          "path" => "/home/songfeihu/logstash-6.2.3/config/test.log",
           "src" => "localhost",
          "prog" => "systemd",
           "pid" => "1",
       "message" => "Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool"
}

2、针对如下log:

<188>Mar 29 2019 16:57:30 BJQ-219-A1-ITCloud-FW-E8000E-1 %%01SEC/4/POLICYPERMIT(l)[1976979]:VSYS=public;

logstash配置如下:

input {
    stdin {    }
}
filter {
    grok {
        match => {
	"message" => "\<(?<id>[0-9]+)\>(?<timestamp>([a-zA-Z]+)\s[0-9]{1,2}\s[0-9]{1,4}\s[0-9]{1,2}:[0-9]{1,2}:[0-9]{1,2})\s%{HOSTNAME:hostname} \%\%(?<version>[0-9]{2})(?<model>[a-zA-Z0-9]+)\/(?<severity>[0-9])\/(?<brief>[a-zA-Z0-9]+)\S+:(?<description>.*)"
	}
    }
}
output {
stdout{codec=>rubydebug}
}

output输出如下:

{
           "host" => "sdn-253",
             "id" => "188",
      "timestamp" => "Mar 29 2019 16:57:30",
       "hostname" => "BJQ-219-A1-ITCloud-FW-E8000E-1",
          "brief" => "POLICYPERMIT",
     "@timestamp" => 2019-06-28T09:54:01.987Z,
       "severity" => "4",
       "@version" => "1",
        "version" => "01",
          "model" => "SEC",
        "message" => "<188>Mar 29 2019 16:57:30 BJQ-219-A1-ITCloud-FW-E8000E-1 %%01SEC/4/POLICYPERMIT(l)[1976979]:VSYS=public;",
    "description" => "VSYS=public;"
}

 

 

 

 

 

 

 

热门文章

暂无图片
编程学习 ·

数据重删和数据压缩主流技术分析

数据压缩随着计算能力的不断提升,当代社会正在产生越来越巨量的数据,数据压缩也被应用在生活的方方面面,如在网上打开的图片、视频、音频等都是经过压缩的。压缩可以分为无损压缩和有损压缩。无损压缩可以通过压缩文件完全恢复原始文件;而有损压缩则会丢失一部分信息。对于…
暂无图片
编程学习 ·

我为什么放弃学术选择创业:这不仅仅关乎人工智能

本文由 Stratifyd 创始人& CEO 汪晓宇(Derek Wang)所写,受福布斯技术委员会(Forbes Technology Council)邀约,原文发布在:https://www.forbes.com/。作为一名公司创始人,我经常进行自我反省,确保我所运营的公司始终坚持公司创立之初的信念。对于我和我的联合创始人…
暂无图片
编程学习 ·

Leetcode 题解 - 排序

快速选择 用于求解 Kth Element 问题,使用快速排序的 partition() 进行实现。 需要先打乱数组,否则最坏情况下时间复杂度为 O(N2)。 堆排序 用于求解 TopK Elements 问题,通过维护一个大小为 K 的堆,堆中的元素就是 TopK Elements。 堆排序也可以用于求解 Kth Element …
暂无图片
编程学习 ·

C++单例设计

单例设计模式的单例类主要用于配置文件读写, 整个项目用一个对象就够了。创建单例类class MyClass {private:MyClass(){}; //构造函数私有化private:static MyClass* m_instance;public:static MyClass* GetInstance(){if(m_instance == NULL){m_instance = new MyClass();stat…
暂无图片
编程学习 ·

低功耗蓝牙(BLE)和传感器的使用

一、低功耗蓝牙的使用Android中关于蓝牙的开发文档,可以参考Google提供的官方蓝牙文档:https://developer.android.google.cn/guide/topics/connectivity/bluetooth.html在Android开发中,应用可通过官方提供的蓝牙API执行以下操作:扫描其他蓝牙设备查询本地蓝牙适配器的配对…
暂无图片
编程学习 ·

redis知识点(一)

redis知识点(一) 问题Redis 持久化机制 缓存雪崩、缓存穿透、缓存预热、缓存更新、缓存降级等问题 热点数据和冷数据是什么 Memcache与Redis的区别都有哪些? 单线程的redis为什么这么快 redis的数据类型,以及每种数据类型的使用场景,Redis 内部结构 redis的过期策略以及内存…
暂无图片
编程学习 ·

一文详解土地增值税

在房地产开发的各个环节,所缴纳的税费不尽相同。其中在商品房销售环节需缴纳的税费比较多,土地增值税是一个主要税种,对企业来说负担比较重,涉及政策也比较复杂。如何确定土地增值税的纳税义务人,明确征税行为和范围;营改增后,土地增值税在应税收入、扣除项目金额、税款预…
暂无图片
编程学习 ·

动态任务

1.任务句柄 /* LED任务句柄 */ static TaskHandle_t LED_Task_Handle; 2.任务创建函数 BaseType_t xTaskCreate( TaskFunction_t pxTaskCode, //任务函数const char * const pcName, //任务名称const uint16_t usStackDepth, //堆栈大小void * const pvParamet…
暂无图片
编程学习 ·

设计模式-建造者模式

设计模式-建造者模式 1.问题提出 盖房项目需求需要建房子:这一过程为打桩、砌墙、封顶 房子有各种各样的,比如普通房,高楼,别墅,各种房子的过程虽然一样,但是要求不要相同的. 请编写程序,完成需求.2.传统方式解决 package builder.traditional;public abstract class Ab…
暂无图片
编程学习 ·

软件测试的基本流程

软件测试的基本流程 1. 测试需求分析阶段阅读需求 理解需求 主要就是对业务的学习 分析需求点 参与需求评审会议2. 测试计划阶段主要任务就是编写测试计划 参考软件需求规格说明书 项目总体计划,内容包括测试范围(来自需求文档),进度安排,人力物力的分配,整体测试策略的制…
暂无图片
编程学习 ·

Python超轻量数据库之SQLite

欢迎关注【无量测试之道】公众号,回复【领取资源】, Python编程学习资源干货、 Python+Appium框架APP的UI自动化、 Python+Selenium框架Web的UI自动化、 Python+Unittest框架API自动化、资源和代码 免费送啦~ 文章下方有公众号二维码,可直接微信扫一扫关注即可。1、什么是 SQ…
暂无图片
编程学习 ·

git学习日志-标签

git也可以像svn一样打标签,以此来标记发布节点,以示重要性。 对标签的操作,一般包括: 1. 列出标签 2. 创建标签 3. 删除标签 4. 检出标签 一、列出标签 执行git tag命令,就可以看到当前仓库中已经打过的标签。注意: 这个命令会以字母顺序列出标签。 也可以加上-l或--list…
暂无图片
编程学习 ·

spring @Primary-@Qualifier在spring中的使用

在spring 中使用注解,常使用@Autowired, 默认是根据类型Type来自动注入的。但有些特殊情况,对同一个接口,可能会有几种不同的实现类,而默认只会采取其中一种的情况下 @Primary 的作用就出来了。下面是个简单的使用例子。 有如下一个接口 public interface Singer {String …
暂无图片
编程学习 ·

斐波那契数列递归算法的优化

public class Fibonacci { //优化使用的数组 static long[] cach = new long[51];public static void main(String[] args) {long a = System.currentTimeMillis();System.out.println( fd( 50 ) );long l = System.currentTimeMillis();System.out.println( l - a );long l1 =…
暂无图片
编程学习 ·

神经网络模型(Backbone)

转自:https://www.cnblogs.com/silence-cho/p/11620863.html神经网络模型(Backbone)自己搭建神经网络时,一般都采用已有的网络模型,在其基础上进行修改。从2012年的AlexNet出现,如今已经出现许多优秀的网络模型,如下图所示。 主要有三个发展方向:Deeper:网络层数更深,代…
暂无图片
编程学习 ·

斐波那契(黄金分割法)查找算法(FibonacciSearch)

斐波那契(黄金分割法)查找算法(FibonacciSearch) 1.基本介绍1)黄金分割点是指把一条线段分割为两部分,使其中一部分与全长之比等于另一部分与这部分之比。取其前三位数字的近似值是0.618。由于按此比例设计的造型十分美丽,因此称为黄金分割,也称为中外比。这是一个神奇的数…