算子清洗功能介绍
功能说明
Filebeat是一个轻量级的采集器,用于采集和处理,转发日志数据。Filebeat将作为代理安装在您的服务器上,监控您指定的日志文件或位置,收集日志事件,并将它们转发到kafka,推送到下一个处理点。
processors是filebeat定义了一系列对单条日志操作的方法,目前支持三类操作:
- 减少导出字段的数量
- 使用附加元数据增加字段
- 执行额外的处理或解码
每个processor接受一个事件(单条日志),将用户定义的操作应用于该事件并返回。如果您定义了一个处理器列表,它们将按照在filebeat配置文件中定义的顺序执行。任何一个算子执行失败,都会直接终止执行链条,并将异常事件发送到fail_to_topic,异常信息会记录在事件中的@errMsg中。
算子配置基本结构
算子配置基本配置结构如下,具体可参见官方文档。
processors: - <processor_name>: when: <condition> <parameters> - <processor_name>: when: <condition> <parameters>
其中<processor_name>指定某个算子操作,例如add_fields等。<condition>指定一个可选条件,如果条件存在,则只有在满足条件才执行算子,如果未设置任何条件,则始终执行算子。<parameters>是传递给算子的参数列表。
更复杂的条件处理可以通过使用if-then-else条件表达式配置来完成,这允许基于单个条件执行多个处理器。
processors: - if: <condition> then: - <processor_name>: <parameters> - <processor_name>: <parameters> ... else: - <processor_name>: <parameters> - <processor_name>: <parameters> ...
if-then-else条件表达式中,then是必须的,else是可选的。
在日志采集配置场景中,定义的processor只能针对某个input进行配置,没有全局配置。
算子介绍
fiilbeat自带的processor算子,如下:
add_cloud_metadata、add_cloudfoundry_metadata、add_docker_metadata、add_fields、add_host_metadata、add_id、add_kubernetes_metadata、add_labels、add_locale、add_observer_metadata、add_process_metadata、add_tags、community_id、convert、copy_fields、decode_base64_field、decode_cef、decode_csv_fields、decode_json_fields、decompress_gzip_field、dissect、dns、drop_event、drop_fields、extract_array、fingerprint、include_fields、registered_domain、rename、script、timestamp、translate_sid、truncate_fields、urldecode。
其他算子介绍如表1所示。
算子 |
算子能力 |
算子样例 |
---|---|---|
equals |
比较字段是否具有特定值,只接受整数或字符串值,常和concat、when、and等算子组合使用。 |
例如:如果message中的type="CLI",then接后续逻辑。 - if: equals: type: "CLI" then: |
contains |
检查一个值是否是个字段的一部分,该字段可以是字符串或字符串数组。条件只接受一个字符串值。常和if、equals、and、concat算子组合使用。 |
例如:检查日志字段中的@logtype是否含有modelservicegateway_interface_Log,then接后续逻辑。 - if: contains: "@logType": "modelservicegateway_interface_Log" then: |
regexp |
校验字符串是否满足正则表达式,只接受字符串。 |
例如:以下条件检查进程名称是否以foo开头: regexp: system.process.name: "^foo.*" |
range |
检查该字段是在一定范围内的值,支持小于lt、小于等于lte、大于gt和大于等于gte,条件只接受整数或浮点值。 |
例如:以下条件通过将http.response.code字段与400进行比较来检查失败的 HTTP事务。 range: http.response.code: gte: 400 这也可以写成: range: http.response.code.gte: 400 以下条件检查CPU使用率的百分比值是否介于 0.5 和 0.8 之间。 range: system.cpu.user.pct.gte: 0.5 system.cpu.user.pct.lt: 0.8 |
network |
检查该字段是否在某个IP网络范围内,支持IPv4和IPv6地址。可以使用CIDR表示法指定网络范围,例如“192.0.2.0/24”或“2001:db8::/32”,或使用以下命名范围之一:
|
如果source.ip值在私有地址空间内,则以下条件返回true。 network: source.ip: private 如果destination.ip值的IPv4范围内192.168.1.0 - 192.168.1.255,则以下条件返回true。 network: destination.ip: '192.168.1.0/24' 当destination.ip在任何给定的子网中时,则以下条件返回true 。 network: destination.ip: ['192.168.1.0/24', '10.0.0.0/8', loopback] |
has_fields |
如果事件存在的所有给定的领域,该条件接受表示字段名称的字符串值列表。 |
例如:以下条件检查http.response.code字段是否存在于事件中。 has_fields: ['http.response.code'] |
or |
逻辑条件表达式,只需满足其中一个条件 |
例如:配置条件appId等于null或者为空,则把appId3的值赋值给appId。 - if: or: - equals: appId: "null" - equals: appId: "" then: - assign: source: ["appId3"] target_field: "appId" |
and |
逻辑条件表达式,同时满足两个或多个条件,常见组合算子有if、or、contains、 equals等算子。 |
例如:判断某个字段值为""且message中的@logtype字段含有 modelservicegateway_interface_Log,就将字段modelCost1赋值给modelCost字段。 - if: and: - equals: modelCost: "" - contains: "@logType": "modelservicegateway_interface_Log" then: - assign: source: ["modelCost1"] target_field: "modelCost" |
not |
接收否定逻辑表达。 |
例如:配置条件@logtype不为modelservicegateway_interface_Log时,把@logtype字段的值赋值给clusterName字段。 - if: not: equals: " @logType": "modelservicegateway_interface_Log" then: - assign: source: ["@logType"] target_field: "clusterName" |
replace |
针对需要对维度降维或者其他替换场景。 |
例如:将字段clusterName中的_modelservicegateway_interface_Log替换成""。 - replace: fail_on_error: false ignore_missing: true fields: - {field: "clusterName",pattern: '_modelservicegateway_interface_Log',replacement: ""}
|
dissect |
根据指定的分隔符切分字符串。 |
使用dissect算子更好地组织和解析数据,便于后续的分析或处理,使用示例如下: - dissect: tokenizer: "%{key1} %{key2} %{key3|convert_datatype}" field: "message" target_prefix: ""
如果message字段的内容是“key1=value1 key2=value2 key3=2022-01-01”,经过dissect操作后将会生成新的字段,key1的值为 “value1”,key2为 “value2”,key3或key3_date(进行了转换)为日期格式的 “2022-01-01”。 |
timestamp |
增加配置目标时间字段格式。 |
时间戳算子原本仅支持以时间对象设置目标字段,为了适配业务对时间戳的诉求,特殊增加了“target_layout”字段,用来配置目标格式,增加“target_timezone”字段,来配置目标时区。 - timestamp: field: timeString target_field: timestamp target_layout: "UNIX_MS" timezone: "Asia/Shanghai" layouts: - '2006-01-02 15:04:05.999' test: - '2021-02-11 17:27:25.025' target_layout支持的格式如下:
|
assign |
根据条件赋值。 |
- assign: source: ["spVolumeId","vod_id","campAlias"] target_field: "content_id_a"
|
concat |
字段之间或者字段和字符串之间拼接 |
- concat: when: and: - not: equals: source: "" - not: equals: target: "" fields: - { field: "%{source}"} - { field: "->"} - { field: "%{target}"} target_field: interface_type
|
decode_json |
对某一个json字段进行解析。 |
例如:对retmsg进行解析。 - decode_json: fields: - field: "retmsg" filters: - {target_field: "user_id",type: "string",filter_keys: ["useId"]}
|
format_tuple |
针对有简单规律的字符串结构解析,比如url参数解析。 |
- format_tuple: field: bizTag collection_items_terminated_by: "&" map_key_terminated_by: "=" filters: - { target_field: "userAgent", filter_keys: ["user-agent","useragent","user_agent"], default: "-" } - { target_field: "serviceType", filter_keys: ["servicetype","service_type"], default: "-" } 如果输入bizTag的值是:"user-agent= Office&servicetype=wiseOap&otherArrtributes=other" 解析后日志为:{"userAgent":" Office","serviceType":"wiseOap"}
|
mapping |
将事件的值做一个映射。 |
- mapping: dict: contentserver: "VideoContentServerService" contentmanage: "VideoContentManageService" onsaleserver: "VideoContentOnsaleService" playserver: "VideoPlayServerService" kmsserver: "VideoKMSService" drmmanage: "VideoDRMManageService" drmproxy: "VideoOpenDRMProxyService" opendrmmanage: "VideoOpenDRMManageService" cads: "VideoOpenDRMCADSService" quickserver: "VideoQuickServerService" shortmanage: "VideoShortManageService" shortserver: "VideoShortServerService" shortonsale: "VideoShortOnsaleService" sysmanage: "VideoSysManageService" sysserver: "VideoSysServerService" behavaiorservice: "VideoUserBehaviorService" campaignservice: "VideoUserCampaignService" poservice: "VideoOrderService" productservice: "VideoProductService" rightservice: "VideoUserRightService" userservice: "VideoUserAuthService" rules: - { source_key: "source" } - { source_key: "target_pre" }
|
segment |
分段算子,对一个字段通过一批判断逻辑后,简单的赋值操作,比如针对时延分段。 |
- segment: field: response_time target_field: delay_section default: "-1" rules: - { label: "0-50",operation: [{operation_type: "gte", value: 0},{operation_type: "lt",value: 50}]} - { label: "50-100",operation: [{operation_type: "gte",value: 50},{operation_type: "lt",value: 100}]} - { label: "100-200",operation: [{operation_type: "gte",value: 100},{operation_type: "lt",value: 200}]} - { label: "200-500",operation: [{operation_type: "gte",value: 200},{operation_type: "lt",value: 500}]} - { label: "500-1000",operation: [{operation_type: "gte",value: 500},{operation_type: "lt",value: 1000}]} - { label: "1000-2000",operation: [{operation_type: "gte",value: 1000},{operation_type: "lt",value: 2000}]} - { label: "2000-5000",operation: [{operation_type: "gte",value: 2000},{operation_type: "lt",value: 5000}]} - { label: "5000-10000",operation: [{operation_type: "gte",value: 5000},{operation_type: "lt",value: 10000}]} - { label: ">1000",operation: [{operation_type: "gte",value: 10000}]}
|
substring |
用来裁剪字符串使用,比如针对拼接的traceId,裁剪获得各部分的值。 |
- substring: fail_on_error: false ignore_missing: true fields: - {from: "transaction_id", begin_index: 0,end_index: 38, to: "x_traceId" } - {from: "transaction_id", begin_index: 0,end_index: 32, to: "x_traceid_begin" } - {from: "transaction_id", begin_index: -20, to: "x_traceid_spanId" }
|
timeout |
该算子主要是为了预防用户配置错误导致采集到过早历史的日志。为了系统的稳定运行,不要采集过早时间的日志做分析,此算子必须配置 。 |
例如,检查当前日志的timestamp字段,这个字段的值是毫秒形式时间戳。如果当前时间减去timestamp对应的时间小于等于120分钟,那么日志正常发送,如果大于120分钟,表示出现异常,日志发送到fail_to_topic。 - timeout field: "timestamp" layout: "UNIX_MS" rules: - {operation_type: "lte", value: 120}
|
算子组合使用样例
日志样例
2022-11-15 19:51:43.735 [20221115T115143]|AI||system|system|inner_ip|ADD|LogRecord|AIOperationLog(id=12d6ffbd-f371-4222-aa69-167bcd7ba3ee, serviceId=null, identityId=1111111111111, userId=test, timestamp=2022-11-15 19:51:43.67, operationType=WELINK_TRIGGER)||Y - serviceId:null
算子功能
对于非json格式的如何转化成json直接解析拿到key、value。例如id=12d6ffbd-f371-4222-aa69-167bcd7ba3ee, serviceId=null, identityId=1111111111111, userId=test, timestamp=2022-11-15 19:51:43.67, operationType=WELINK_TRIGGER。
算子脚本
- dissect: tokenizer: '%{time}|%{thread}|%{logVersion}|%{uid}|%{sign}|%{traceId}|%{cc}|%{operationLog}|%{bb}|%{aa}' field: "message" target_prefix: "" trim_values: "none" ignore_failure: true - replace: fail_on_error: false ignore_missing: true fields: - {field: "bb",pattern: 'AIOperationLog\(',replacement: "{\""} - {field: "bb",pattern: '\)',replacement: "\"}"} - {field: "bb",pattern: ' ,',replacement:"\""} - if: contains: "operationLog": "LogRecord" then: - replace: fail_on_error: false ignore_missing: true fields: - {field: "bb",pattern: '=',replacement: "\":\""} - {field: "bb",pattern: ' ',replacement:""} - {field: "bb",pattern: '\,',replacement: "\",\""} - decode_json: ignore_missing: true ignore_failure: true fields: - field: "bb" filters: - {target_field: "id",type: "string",filter_keys: ["id"]} - {target_field: "serviceId",type: "string",filter_keys: ["serviceId"]} - {target_field: "identityId",type: "string",filter_keys: ["identityId"]} - {target_field: "userId",type: "string",filter_keys: ["userId"]} - {target_field: "timestamp",type: "string",filter_keys: ["timestamp"]} - {target_field: "operationType",type: "string",filter_keys: ["operationType"]} - drop_fields: fields: ["time","thread","logVersion","uid","sign","traceId","cc","aa","bb"] ignore_missing: true - drop_event: when: not: equals: "operationLog": "LogRecord"