更新时间:2024-11-07 GMT+08:00
分享

算子清洗功能介绍

功能说明

Filebeat是一个轻量级的采集器,用于采集和处理,转发日志数据。Filebeat将作为代理安装在您的服务器上,监控您指定的日志文件或位置,收集日志事件,并将它们转发到kafka,推送到下一个处理点。

processors是filebeat定义了一系列对单条日志操作的方法,目前支持三类操作:

  • 减少导出字段的数量
  • 使用附加元数据增加字段
  • 执行额外的处理或解码

每个processor接受一个事件(单条日志),将用户定义的操作应用于该事件并返回。如果您定义了一个处理器列表,它们将按照在filebeat配置文件中定义的顺序执行。任何一个算子执行失败,都会直接终止执行链条,并将异常事件发送到fail_to_topic,异常信息会记录在事件中的@errMsg中。

算子配置基本结构

算子配置基本配置结构如下,具体可参见官方文档

processors:
  - <processor_name>:
     when:
       <condition>
     <parameters>
  - <processor_name>:
     when:
       <condition>
     <parameters>

其中<processor_name>指定某个算子操作,例如add_fields等。<condition>指定一个可选条件,如果条件存在,则只有在满足条件才执行算子,如果未设置任何条件,则始终执行算子。<parameters>是传递给算子的参数列表。

更复杂的条件处理可以通过使用if-then-else条件表达式配置来完成,这允许基于单个条件执行多个处理器。

processors:
  - if:
     <condition>
    then: 
      - <processor_name>:
         <parameters>
      - <processor_name>:
         <parameters>
     ...
    else: 
      - <processor_name>:
         <parameters>
      - <processor_name>:
         <parameters>
     ...

if-then-else条件表达式中,then是必须的,else是可选的。

在日志采集配置场景中,定义的processor只能针对某个input进行配置,没有全局配置。

算子介绍

fiilbeat自带的processor算子,如下:

add_cloud_metadataadd_cloudfoundry_metadataadd_docker_metadataadd_fieldsadd_host_metadataadd_idadd_kubernetes_metadataadd_labelsadd_localeadd_observer_metadataadd_process_metadataadd_tagscommunity_idconvertcopy_fieldsdecode_base64_fielddecode_cefdecode_csv_fieldsdecode_json_fieldsdecompress_gzip_fielddissectdnsdrop_eventdrop_fieldsextract_arrayfingerprintinclude_fieldsregistered_domainrenamescripttimestamptranslate_sidtruncate_fieldsurldecode

其他算子介绍如表1所示。

表1 算子介绍

算子

算子能力

算子样例

equals

比较字段是否具有特定值,只接受整数或字符串值,常和concat、when、and等算子组合使用。

例如:如果message中的type="CLI",then接后续逻辑。

- if:
     equals:
         type: "CLI"
  then:

contains

检查一个值是否是个字段的一部分,该字段可以是字符串或字符串数组。条件只接受一个字符串值。常和if、equals、and、concat算子组合使用。

例如:检查日志字段中的@logtype是否含有modelservicegateway_interface_Log,then接后续逻辑。

 - if:
     contains:
         "@logType": "modelservicegateway_interface_Log"
  then:

regexp

校验字符串是否满足正则表达式,只接受字符串。

例如:以下条件检查进程名称是否以foo开头:

regexp:
 system.process.name: "^foo.*"

range

检查该字段是在一定范围内的值,支持小于lt、小于等于lte、大于gt和大于等于gte,条件只接受整数或浮点值。

例如:以下条件通过将http.response.code字段与400进行比较来检查失败的 HTTP事务。

range:
 http.response.code:
   gte: 400  

这也可以写成:

range: http.response.code.gte: 400

以下条件检查CPU使用率的百分比值是否介于 0.5 和 0.8 之间。

range: system.cpu.user.pct.gte: 0.5
system.cpu.user.pct.lt: 0.8

network

检查该字段是否在某个IP网络范围内,支持IPv4和IPv6地址。可以使用CIDR表示法指定网络范围,例如“192.0.2.0/24”或“2001:db8::/32”,或使用以下命名范围之一:

  • loopback:匹配 127.0.0.0/8 或 范围内的环回地址 ::1/128 。
  • unicast:匹配 RFC 1122、RFC 4632 和 RFC 4291 中定义的全球广播地址,但 IPv4 广播地址 ( 255.255.255.255 ) 除外。这包括私有地址范围。
  • multicast:匹配多播地址。
  • interface_local_multicast:匹配 IPv6 接口本地多播地址。
  • link_local_unicast:匹配链路本地单播地址。
  • link_local_multicast:匹配链路本地多播地址。
  • private:匹配 RFC 1918 (IPv4) 和 RFC 4193 (IPv6) 中定义的私有地址范围。
  • public:匹配非环回、未指定、IPv4 广播、链路本地单播、链路本地多播、接口本地多播或专用的地址。
  • unspecified:匹配未指定的地址(IPv4 地址“0.0.0.0”或 IPv6 地址“::”)。

如果source.ip值在私有地址空间内,则以下条件返回true。

network:
 source.ip: private

如果destination.ip值的IPv4范围内192.168.1.0 - 192.168.1.255,则以下条件返回true。

network:
 destination.ip: '192.168.1.0/24'

当destination.ip在任何给定的子网中时,则以下条件返回true 。

network:
 destination.ip: ['192.168.1.0/24', '10.0.0.0/8', loopback]

has_fields

如果事件存在的所有给定的领域,该条件接受表示字段名称的字符串值列表。

例如:以下条件检查http.response.code字段是否存在于事件中。

has_fields: ['http.response.code']

or

逻辑条件表达式,只需满足其中一个条件

例如:配置条件appId等于null或者为空,则把appId3的值赋值给appId。

- if:
    or:
      - equals:
          appId: "null"
      - equals:
          appId: ""
  then:     
      - assign:
          source: ["appId3"]
          target_field: "appId"

and

逻辑条件表达式,同时满足两个或多个条件,常见组合算子有if、or、contains、 equals等算子。

例如:判断某个字段值为""且message中的@logtype字段含有 modelservicegateway_interface_Log,就将字段modelCost1赋值给modelCost字段。

- if:
    and:
      - equals:
          modelCost: ""
      - contains:
          "@logType": "modelservicegateway_interface_Log"
  then:     
      - assign:
          source: ["modelCost1"]
          target_field: "modelCost"   

not

接收否定逻辑表达。

例如:配置条件@logtype不为modelservicegateway_interface_Log时,把@logtype字段的值赋值给clusterName字段。

- if:
    not:
     equals:
        " @logType": "modelservicegateway_interface_Log"
  then:
      - assign:
          source: ["@logType"]
          target_field: "clusterName"

replace

针对需要对维度降维或者其他替换场景。

例如:将字段clusterName中的_modelservicegateway_interface_Log替换成""。

- replace:
      fail_on_error: false
      ignore_missing: true
      fields:          
        - {field: "clusterName",pattern: '_modelservicegateway_interface_Log',replacement: ""}
  • ignore_missing: 仅支持bool类型,如果为true,找不到源字段不会报错。
  • fail_on_error: 仅支持bool类型,如果为false,任何情况的报错,都不会抛出。
  • fields:对象数组,配置替换规则。
  • field: 源字段,日志中的字段。
  • pattern:正则表达式, 字符串两边要用单引号 。
  • replacement: 仅支持字符串,标识替换为该字段。

dissect

根据指定的分隔符切分字符串。

使用dissect算子更好地组织和解析数据,便于后续的分析或处理,使用示例如下:

- dissect:
      tokenizer: "%{key1} %{key2} %{key3|convert_datatype}"
      field: "message"
      target_prefix: ""
  • tokenizer:定义解析后的字段及字段格式,利用分隔符对字符串进行分割,此处使用空格进行分割。%{} 是字段引用的语法,key1、key2、key3是要匹配的字段名。|convert_datatype是一个转换操作,表示在分割后对key3字段的数据类型进行转换(例如从字符串转为日期格式)。
  • field:指定了要进行分割操作的原始字段,这里是 “message”,即数据中的消息内容。
  • target_prefix:用于处理器对数据解析,必须为空。

如果message字段的内容是“key1=value1 key2=value2 key3=2022-01-01”,经过dissect操作后将会生成新的字段,key1的值为 “value1”,key2为 “value2”,key3或key3_date(进行了转换)为日期格式的 “2022-01-01”。

timestamp

增加配置目标时间字段格式。

时间戳算子原本仅支持以时间对象设置目标字段,为了适配业务对时间戳的诉求,特殊增加了“target_layout”字段,用来配置目标格式,增加“target_timezone”字段,来配置目标时区。

  - timestamp:
       field: timeString
       target_field: timestamp
       target_layout: "UNIX_MS"
       timezone: "Asia/Shanghai"
       layouts:
         - '2006-01-02 15:04:05.999'
       test:
         - '2021-02-11 17:27:25.025'

target_layout支持的格式如下:

  • 不配置或者空字符串: 直接存储时间对象。
  • UNIX: 标识时间戳秒形式。
  • UNIX_MS: 标识时间戳毫秒形式。
  • UNIX_NS: 标识时间戳纳秒形式。
  • 其他时间格式: 转换成对应的时间格式字符串。

assign

根据条件赋值。

 - assign:
       source: ["spVolumeId","vod_id","campAlias"]
       target_field: "content_id_a"   
  • source:一个包含字段名称的数组,assign算子会逐一尝试取值,并赋值到新的字段名称,如果都没取到,那么不会产生新的字段名称。
  • target_field: 赋值的目标字段,如果已经存在该字段,assign算子会强制覆盖。

concat

字段之间或者字段和字符串之间拼接

   - concat:
       when: 
         and:
           - not:
               equals:
                 source: ""
           - not:
               equals:
                 target: ""
       fields:
         - { field: "%{source}"}
         - { field: "->"}
         - { field: "%{target}"}
       target_field: interface_type
  • fields: 对象数组,标识需要拼接的字符串,定义顺序代表拼接的顺序。
  • field:对象,待拼接的字符串定义,如果使用%{},那么表示是从日志字段当中取值,如果取不到将会报错,规避方法见ignore_missing、ignore_failure。
  • ignore_missing: 只能接收bool值,如果从日志字段取值找不到,且配置此字段为true,则会以空字符串代替。
  • ignore_failure: 只能接收bool值,不管发生任何异常都忽略。

decode_json

对某一个json字段进行解析。

例如:对retmsg进行解析。

   - decode_json:
       fields: 
         - field: "retmsg"
           filters:
           - {target_field: "user_id",type: "string",filter_keys: ["useId"]}
  • fields: 对象数组,标识针对一个字段的json解析。
  • field:需要json解析的字段名称。
  • ignore_missing: 只能接收bool值,定义true,如果解析的字段找不到,不会报错。
  • ignore_failure: 只能接收bool值,定义true,任何异常场景,都不会报错。
  • overwrite: 只能接收bool值,标识是否强行覆盖。
  • filters:对象数组,标识从json解析后获取的字段以及类型。
  • target_field: 解析出来的值,赋值给的字段。
  • type: 解析出来的值,转换成什么类型。支持integer、long、float、object、string、boolean、array。不建议使用object和array,会比较影响性能。
  • filter_keys: 字符串数组,标识解析出来的json中的key,会按照数组定义顺序取值,直到取到为止。支持嵌套获取,比如{"a":{"b":"c"}},可以定义a.b,取到c,正常的'.'请用'\\.'代替。
  • default: 如果没有取到值的默认值是什么,不支持空字符串。

format_tuple

针对有简单规律的字符串结构解析,比如url参数解析。

   - format_tuple:
       field: bizTag
       collection_items_terminated_by: "&"
       map_key_terminated_by: "="
       filters:
         - { target_field: "userAgent", filter_keys: ["user-agent","useragent","user_agent"], default: "-" }
         - { target_field: "serviceType", filter_keys: ["servicetype","service_type"], default: "-" }

如果输入bizTag的值是:"user-agent= Office&servicetype=wiseOap&otherArrtributes=other"

解析后日志为:{"userAgent":" Office","serviceType":"wiseOap"}

  • field:目标解析字段。
  • collection_items_terminated_by:不同的key之间的分割标记。
  • map_key_terminated_by: key与value之间的分割标记。
  • open: 解析字符串开头标记。
  • close: 解析字符串结尾标记。
  • ignore_missing: 仅接受bool类型,标识是否允许目标解析字段找不到,若为true,忽略目标字段找不到异常。
  • ignore_failure: 仅接受bool类型,标识法是否忽略异常,若为true,忽略所有异常。
  • filters: 对象数组,标识从解析后的key,取哪些key作为值。
  • target_field:目标字段,解析到的值赋值给目标字段。
  • filter_keys: 字符串数组,从解析后的key中,依次获取到有值的进行赋值。【注意:该算子会将原始值和匹配的值转换成小写比较】。
  • default: 如果没有取到值的默认值。

mapping

将事件的值做一个映射。

   - mapping:
       dict:     
         contentserver: "VideoContentServerService"
         contentmanage: "VideoContentManageService"
         onsaleserver: "VideoContentOnsaleService"
         playserver: "VideoPlayServerService"
         kmsserver: "VideoKMSService"
         drmmanage: "VideoDRMManageService"
         drmproxy: "VideoOpenDRMProxyService"
         opendrmmanage: "VideoOpenDRMManageService"
         cads: "VideoOpenDRMCADSService"
         quickserver: "VideoQuickServerService"
         shortmanage: "VideoShortManageService"
         shortserver: "VideoShortServerService"
         shortonsale: "VideoShortOnsaleService"
         sysmanage: "VideoSysManageService"
         sysserver: "VideoSysServerService"
         behavaiorservice: "VideoUserBehaviorService"
         campaignservice: "VideoUserCampaignService"
         poservice: "VideoOrderService"
         productservice: "VideoProductService"
         rightservice: "VideoUserRightService"
         userservice: "VideoUserAuthService"
       rules: 
         - { source_key: "source"  }
         - { source_key: "target_pre" }
  • dict:字典表,只支持string格式。
  • ignore_missing:仅接受bool类型,若为true,事件对象中的字段找不到,不报错。
  • ignore_failure: 仅接受bool类型,若为true,会忽略过程中所有异常。
  • mode: 模式,仅允许输入copy、rename。如果是copy那么在映射后原始字段会保留,如果是rename,那么原始字段会被删除。
  • rules:对象数组,映射规则定义。
  • source_key:事件对象中的字段,如果没有定义target_field字段,那么会复写到source_key。
  • target_field:目标字段,映射后的值赋值到该字段。
  • default_value:默认值,仅接受字符串,如果找不到那么会填入默认字段,若为空字符串会忽略。

segment

分段算子,对一个字段通过一批判断逻辑后,简单的赋值操作,比如针对时延分段。

- segment:
       field: response_time
       target_field: delay_section
       default: "-1"
       rules:
         - { label: "0-50",operation: [{operation_type: "gte", value: 0},{operation_type: "lt",value: 50}]}
         - { label: "50-100",operation: [{operation_type: "gte",value: 50},{operation_type: "lt",value: 100}]}
         - { label: "100-200",operation: [{operation_type: "gte",value: 100},{operation_type: "lt",value: 200}]}
         - { label: "200-500",operation: [{operation_type: "gte",value: 200},{operation_type: "lt",value: 500}]}
         - { label: "500-1000",operation: [{operation_type: "gte",value: 500},{operation_type: "lt",value: 1000}]}
         - { label: "1000-2000",operation: [{operation_type: "gte",value: 1000},{operation_type: "lt",value: 2000}]}
         - { label: "2000-5000",operation: [{operation_type: "gte",value: 2000},{operation_type: "lt",value: 5000}]}
         - { label: "5000-10000",operation: [{operation_type: "gte",value: 5000},{operation_type: "lt",value: 10000}]}
         - { label: ">1000",operation: [{operation_type: "gte",value: 10000}]}
  • field:日志字段,标识当前算子从这个字段获取值进行处理。
  • field_type: 字段类型,如果是string,那么算子会将原始字段转换为string后比较,其他值都会转换为double类型比较。
  • target_field:目标字段,算子操作后赋值给这个字段。
  • default: 默认值,可以不设置,那么分段判断逻辑找不到,不会进行赋值操作。赋值可以支持string、int、bool、
  • ignore_missing:仅接受bool类型,若为true,日志对象中的字段找不到,不报错。
  • ignore_failure: 仅接受bool类型,若为true,会忽略过程中所有异常。
  • rules: 对象数组,声明判断逻辑的对象,算子会按照定义顺序,从上往下进行运算,直到找到满足条件的规则为止。
  • label: 标识满足运算后的值。
  • operation:对象数组,内部的规则判断是且的关系。
  • value:日志字段比较的对象,可以填入字符或者数字。
  • operation_type:比较类型,仅允许填入的值如下。
    • eq:判断日志字段跟value值相等。
    • contain:判断日志字段包含value值,当且仅当field_type为string的时候可以使用。
    • start_with:判断日志字段是以value值开头,当且仅当field_type为string的时候可以使用
    • nd_with:判断日志字段是以value值结尾,当且仅当field_type为string的时候可以使用。
    • ignore_eq:判断日志字段在忽略大小写的情况相等,当且仅当field_type为string的时候可以使用 。
    • ignore_contain:判断日志字段在忽略大小写的情况下包含,当且仅当field_type为string的时候可以使用。
    • gte:判断日志字段大于等于value值,当且仅当field_type不为string的时候可以使用。
    • gt:判断日志字段大于value值,当且仅当field_type不为string的时候可以使用。
    • lte:判断日志字段小于等于value值,当且仅当field_type不为string的时候可以使用。
    • lt:判断日志字段小于value值,当且仅当field_type不为string的时候可以使用。

substring

用来裁剪字符串使用,比如针对拼接的traceId,裁剪获得各部分的值。

 - substring:
       fail_on_error: false
       ignore_missing: true
       fields:
         - {from: "transaction_id", begin_index: 0,end_index: 38, to: "x_traceId" }
         - {from: "transaction_id", begin_index: 0,end_index: 32, to: "x_traceid_begin" }
         - {from: "transaction_id", begin_index: -20, to: "x_traceid_spanId" }
  • ignore_missing:仅接受bool类型,若为true,日志对象中的字段找不到,不报错。
  • fail_on_error: 仅接受bool类型,若为false会忽略过程中所有异常。
  • fields:对象数组,标识裁剪字符串规则。
  • from: 数据源,标识日志中的字段。
  • begin_index:开始位置,如果是正数,则表明从字符串左侧往右第几个;如果是附属,表明从字符串右侧往左第几个,0标识字符串开头。
  • end_index:结束位置,如果是正数,则表明从字符串左侧往右第几个;如果是附属,表明从字符串右侧往左第几个,0标识字符串结尾。
  • to: 目标字段,裁剪后的值,赋值给目标字段。
  • type: 类型,裁剪后转换为该类型。仅支持integer、long、float、double、string、boolean。

timeout

该算子主要是为了预防用户配置错误导致采集到过早历史的日志。为了系统的稳定运行,不要采集过早时间的日志做分析,此算子必须配置

例如,检查当前日志的timestamp字段,这个字段的值是毫秒形式时间戳。如果当前时间减去timestamp对应的时间小于等于120分钟,那么日志正常发送,如果大于120分钟,表示出现异常,日志发送到fail_to_topic。

 - timeout
 field: "timestamp"
 layout: "UNIX_MS"
 rules:
  - {operation_type: "lte", value: 120}
  • timestamp是日志字段中的必须字段,标识当前日志的时间,配置中一定要解析出timestamp字段。
  • layout:时间戳格式,只支持UNIX_MS和UNIX两种格式。
  • rules:是一个对象数组,每个数组元素是一条判断规则,数组元素之间是且的关系。
  • operation_type:支持小于lt、小于等于lte、大于gt、大于等于gte和等于eq五种操作比较。例子中lte是当前时间减去日志时间要小于等于120分钟才算有效。
  • value:相隔分钟数。

算子组合使用样例

日志样例

2022-11-15 19:51:43.735 [20221115T115143]|AI||system|system|inner_ip|ADD|LogRecord|AIOperationLog(id=12d6ffbd-f371-4222-aa69-167bcd7ba3ee, serviceId=null, identityId=1111111111111, userId=test, timestamp=2022-11-15 19:51:43.67, operationType=WELINK_TRIGGER)||Y - serviceId:null

算子功能

对于非json格式的如何转化成json直接解析拿到key、value。例如id=12d6ffbd-f371-4222-aa69-167bcd7ba3ee, serviceId=null, identityId=1111111111111, userId=test, timestamp=2022-11-15 19:51:43.67, operationType=WELINK_TRIGGER。

算子脚本

- dissect:
    tokenizer: '%{time}|%{thread}|%{logVersion}|%{uid}|%{sign}|%{traceId}|%{cc}|%{operationLog}|%{bb}|%{aa}'
    field: "message"
    target_prefix: ""
    trim_values: "none"
    ignore_failure: true

- replace:
    fail_on_error: false
    ignore_missing: true
    fields:          
     - {field: "bb",pattern: 'AIOperationLog\(',replacement: "{\""}
     - {field: "bb",pattern: '\)',replacement: "\"}"}
     - {field: "bb",pattern: ' ,',replacement:"\""}
- if:
    contains:
        "operationLog": "LogRecord"
  then:
     - replace:
         fail_on_error: false
         ignore_missing: true
         fields:          
          - {field: "bb",pattern: '=',replacement: "\":\""}
          - {field: "bb",pattern: ' ',replacement:""}
          - {field: "bb",pattern: '\,',replacement: "\",\""}
     - decode_json:
        ignore_missing: true
        ignore_failure: true
        fields:
          - field: "bb"
            filters:
             - {target_field: "id",type: "string",filter_keys: ["id"]}
             - {target_field: "serviceId",type: "string",filter_keys: ["serviceId"]}
             - {target_field: "identityId",type: "string",filter_keys: ["identityId"]}
             - {target_field: "userId",type: "string",filter_keys: ["userId"]}
             - {target_field: "timestamp",type: "string",filter_keys: ["timestamp"]}
             - {target_field: "operationType",type: "string",filter_keys: ["operationType"]}  
     - drop_fields:
        fields: ["time","thread","logVersion","uid","sign","traceId","cc","aa","bb"]
        ignore_missing: true          
- drop_event:
     when:
       not:
         equals:
            "operationLog": "LogRecord" 

相关文档