算子清洗功能介绍
功能说明
Filebeat是一个轻量级的采集器,用于采集和处理,转发日志数据。Filebeat将作为代理安装在您的服务器上,监控您指定的日志文件或位置,收集日志事件,并将它们转发到kafka,推送到下一个处理点。
processors是filebeat定义了一系列对单条日志操作的方法,目前支持三类操作:
- 减少导出字段的数量
- 使用附加元数据增加字段
- 执行额外的处理或解码
每个processor接受一个事件(单条日志),将用户定义的操作应用于该事件并返回。如果您定义了一个处理器列表,它们将按照在filebeat配置文件中定义的顺序执行。任何一个算子执行失败,都会直接终止执行链条,并将异常事件发送到fail_to_topic,异常信息会记录在事件中的@errMsg中。
算子配置基本结构
算子配置基本配置结构如下,具体可参见官方文档。
processors:
- <processor_name>:
when:
<condition>
<parameters>
- <processor_name>:
when:
<condition>
<parameters>
其中<processor_name>指定某个算子操作,例如add_fields等。<condition>指定一个可选条件,如果条件存在,则只有在满足条件才执行算子,如果未设置任何条件,则始终执行算子。<parameters>是传递给算子的参数列表。
更复杂的条件处理可以通过使用if-then-else条件表达式配置来完成,这允许基于单个条件执行多个处理器。
processors:
- if:
<condition>
then:
- <processor_name>:
<parameters>
- <processor_name>:
<parameters>
...
else:
- <processor_name>:
<parameters>
- <processor_name>:
<parameters>
...
if-then-else条件表达式中,then是必须的,else是可选的。
在日志采集配置场景中,定义的processor只能针对某个input进行配置,没有全局配置。
算子介绍
filebeat自带的processor算子,如下:
add_cloud_metadata、add_cloudfoundry_metadata、add_docker_metadata、add_fields、add_host_metadata、add_id、add_kubernetes_metadata、add_labels、add_locale、add_observer_metadata、add_process_metadata、add_tags、community_id、convert、copy_fields、decode_base64_field、decode_cef、decode_csv_fields、decode_json_fields、decompress_gzip_field、dissect、dns、drop_event、drop_fields、extract_array、fingerprint、include_fields、registered_domain、rename、script、timestamp、translate_sid、truncate_fields、urldecode。
其他算子介绍如表1所示。
|
算子 |
算子能力 |
算子样例 |
|---|---|---|
|
equals |
比较字段是否具有特定值,只接受整数或字符串值,常和concat、when、and等算子组合使用。 |
例如:如果message中的type="CLI",then接后续逻辑。 - if:
equals:
type: "CLI"
then: |
|
contains |
检查一个值是否是个字段的一部分,该字段可以是字符串或字符串数组。条件只接受一个字符串值。常和if、equals、and、concat算子组合使用。 |
例如:检查日志字段中的@logtype是否含有modelservicegateway_interface_Log,then接后续逻辑。 - if:
contains:
"@logType": "modelservicegateway_interface_Log"
then: |
|
regexp |
校验字符串是否满足正则表达式,只接受字符串。 |
例如:以下条件检查进程名称是否以foo开头: regexp: system.process.name: "^foo.*" |
|
range |
检查该字段是在一定范围内的值,支持小于lt、小于等于lte、大于gt和大于等于gte,条件只接受整数或浮点值。 |
例如:以下条件通过将http.response.code字段与400进行比较来检查失败的HTTP事务。 range: http.response.code: gte: 400 这也可以写成: range: http.response.code.gte: 400 以下条件检查CPU使用率的百分比值是否介于0.5和0.8之间。 range: system.cpu.user.pct.gte: 0.5 system.cpu.user.pct.lt: 0.8 |
|
network |
检查该字段是否在某个IP网络范围内,支持IPv4和IPv6地址。可以使用CIDR表示法指定网络范围,例如“192.0.2.0/24”或“2001:db8::/32”,或使用以下命名范围之一:
|
如果source.ip值在私有地址空间内,则以下条件返回true。 network: source.ip: private 如果destination.ip值的IPv4范围内192.168.1.0 - 192.168.1.255,则以下条件返回true。 network: destination.ip: '192.168.1.0/24' 当destination.ip在任何给定的子网中时,则以下条件返回true 。 network: destination.ip: ['192.168.1.0/24', '10.0.0.0/8', loopback] |
|
has_fields |
如果事件存在的所有给定的领域,该条件接受表示字段名称的字符串值列表。 |
例如:以下条件检查http.response.code字段是否存在于事件中。 has_fields: ['http.response.code'] |
|
or |
逻辑条件表达式,只需满足其中一个条件 |
例如:配置条件appId等于null或者为空,则把appId3的值赋值给appId。 - if:
or:
- equals:
appId: "null"
- equals:
appId: ""
then:
- assign:
source: ["appId3"]
target_field: "appId" |
|
and |
逻辑条件表达式,同时满足两个或多个条件,常见组合算子有if、or、contains、 equals等算子。 |
例如:判断某个字段值为""且message中的@logtype字段含有modelservicegateway_interface_Log,就将字段modelCost1赋值给modelCost字段。 - if:
and:
- equals:
modelCost: ""
- contains:
"@logType": "modelservicegateway_interface_Log"
then:
- assign:
source: ["modelCost1"]
target_field: "modelCost" |
|
not |
接收否定逻辑表达。 |
例如:配置条件@logtype不为modelservicegateway_interface_Log时,把@logtype字段的值赋值给clusterName字段。 - if:
not:
equals:
" @logType": "modelservicegateway_interface_Log"
then:
- assign:
source: ["@logType"]
target_field: "clusterName" |
|
replace |
针对需要对维度降维或者其他替换场景。 |
例如:将字段clusterName中的_modelservicegateway_interface_Log替换成""。 - replace:
fail_on_error: false
ignore_missing: true
fields:
- {field: "clusterName",pattern: '_modelservicegateway_interface_Log',replacement: ""}
|
|
dissect |
根据指定的分隔符切分字符串。 |
使用dissect算子更好地组织和解析数据,便于后续的分析或处理,使用示例如下: - dissect:
tokenizer: "%{key1} %{key2} %{key3|convert_datatype}"
field: "message"
target_prefix: ""
如果message字段的内容是“key1=value1 key2=value2 key3=2022-01-01”,经过dissect操作后将会生成新的字段,key1的值为 “value1”,key2为 “value2”,key3或key3_date(进行了转换)为日期格式的 “2022-01-01”。 |
|
timestamp |
增加配置目标时间字段格式。 |
时间戳算子仅支持以时间对象设置目标字段,为了适配业务对时间戳的诉求,特殊增加了“target_layout”字段,用来配置目标格式,增加“target_timezone”字段,来配置目标时区。 - timestamp:
field: timeString
target_field: timestamp
target_layout: "UNIX_MS"
timezone: "xxx/xxxx"
layouts:
- '2006-01-02 15:04:05.999'
test:
- '2021-02-11 17:27:25.025'
target_layout支持的格式如下:
|
|
assign |
根据条件赋值。 |
- assign:
source: ["spVolumeId","vod_id","campAlias"]
target_field: "content_id_a"
|
|
concat |
字段之间或者字段和字符串之间拼接 |
- concat:
when:
and:
- not:
equals:
source: ""
- not:
equals:
target: ""
fields:
- { field: "%{source}"}
- { field: "->"}
- { field: "%{target}"}
target_field: interface_type
|
|
decode_json |
对某一个json字段进行解析。 |
例如:对retmsg进行解析。 - decode_json:
fields:
- field: "retmsg"
filters:
- {target_field: "user_id",type: "string",filter_keys: ["useId"]}
|
|
format_tuple |
针对有简单规律的字符串结构解析,比如url参数解析。 |
- format_tuple:
field: bizTag
collection_items_terminated_by: "&"
map_key_terminated_by: "="
filters:
- { target_field: "userAgent", filter_keys: ["user-agent","useragent","user_agent"], default: "-" }
- { target_field: "serviceType", filter_keys: ["servicetype","service_type"], default: "-" }
如果输入bizTag的值是:"user-agent= Office&servicetype=wiseOap&otherArrtributes=other"解析后日志为:{"userAgent":" Office","serviceType":"wiseOap"}
|
|
mapping |
将事件的值做一个映射。 |
- mapping:
dict:
contentserver: "VideoContentServerService"
contentmanage: "VideoContentManageService"
onsaleserver: "VideoContentOnsaleService"
playserver: "VideoPlayServerService"
kmsserver: "VideoKMSService"
drmmanage: "VideoDRMManageService"
drmproxy: "VideoOpenDRMProxyService"
opendrmmanage: "VideoOpenDRMManageService"
cads: "VideoOpenDRMCADSService"
quickserver: "VideoQuickServerService"
shortmanage: "VideoShortManageService"
shortserver: "VideoShortServerService"
shortonsale: "VideoShortOnsaleService"
sysmanage: "VideoSysManageService"
sysserver: "VideoSysServerService"
behavaiorservice: "VideoUserBehaviorService"
campaignservice: "VideoUserCampaignService"
poservice: "VideoOrderService"
productservice: "VideoProductService"
rightservice: "VideoUserRightService"
userservice: "VideoUserAuthService"
rules:
- { source_key: "source" }
- { source_key: "target_pre" }
|
|
segment |
分段算子,对一个字段通过一批判断逻辑后,简单的赋值操作,比如针对时延分段。 |
- segment:
field: response_time
target_field: delay_section
default: "-1"
rules:
- { label: "0-50",operation: [{operation_type: "gte", value: 0},{operation_type: "lt",value: 50}]}
- { label: "50-100",operation: [{operation_type: "gte",value: 50},{operation_type: "lt",value: 100}]}
- { label: "100-200",operation: [{operation_type: "gte",value: 100},{operation_type: "lt",value: 200}]}
- { label: "200-500",operation: [{operation_type: "gte",value: 200},{operation_type: "lt",value: 500}]}
- { label: "500-1000",operation: [{operation_type: "gte",value: 500},{operation_type: "lt",value: 1000}]}
- { label: "1000-2000",operation: [{operation_type: "gte",value: 1000},{operation_type: "lt",value: 2000}]}
- { label: "2000-5000",operation: [{operation_type: "gte",value: 2000},{operation_type: "lt",value: 5000}]}
- { label: "5000-10000",operation: [{operation_type: "gte",value: 5000},{operation_type: "lt",value: 10000}]}
- { label: ">1000",operation: [{operation_type: "gte",value: 10000}]}
|
|
substring |
用来裁剪字符串使用,比如针对拼接的traceId,裁剪获得各部分的值。 |
- substring:
fail_on_error: false
ignore_missing: true
fields:
- {from: "transaction_id", begin_index: 0,end_index: 38, to: "x_traceId" }
- {from: "transaction_id", begin_index: 0,end_index: 32, to: "x_traceid_begin" }
- {from: "transaction_id", begin_index: -20, to: "x_traceid_spanId" }
|
|
timeout |
该算子主要是为了预防用户配置错误导致采集到过早历史的日志。为了系统的稳定运行,不要采集过早时间的日志做分析,此算子必须配置 。 |
例如,检查当前日志的timestamp字段,这个字段的值是毫秒形式时间戳。如果当前时间减去timestamp对应的时间小于等于120分钟,那么日志正常发送,如果大于120分钟,表示出现异常,日志发送到fail_to_topic。 - timeout
field: "timestamp"
layout: "UNIX_MS"
rules:
- {operation_type: "lte", value: 120}
|
算子组合使用样例
日志样例
2022-11-15 19:51:43.735 [20221115T115143]|AI||system|system|inner_ip|ADD|LogRecord|AIOperationLog(id=12d6ffbd-f371-4222-aa69-167bcd7ba3ee, serviceId=null, identityId=1111111111111, userId=test, timestamp=2022-11-15 19:51:43.67, operationType=WELINK_TRIGGER)||Y - serviceId:null
算子功能
对于非json格式的如何转化成json直接解析拿到key、value。例如id=12d6ffbd-f371-4222-aa69-167bcd7ba3ee, serviceId=null, identityId=1111111111111, userId=test, timestamp=2022-11-15 19:51:43.67, operationType=WELINK_TRIGGER。
算子脚本
- dissect:
tokenizer: '%{time}|%{thread}|%{logVersion}|%{uid}|%{sign}|%{traceId}|%{cc}|%{operationLog}|%{bb}|%{aa}'
field: "message"
target_prefix: ""
trim_values: "none"
ignore_failure: true
- replace:
fail_on_error: false
ignore_missing: true
fields:
- {field: "bb",pattern: 'AIOperationLog\(',replacement: "{\""}
- {field: "bb",pattern: '\)',replacement: "\"}"}
- {field: "bb",pattern: ' ,',replacement:"\""}
- if:
contains:
"operationLog": "LogRecord"
then:
- replace:
fail_on_error: false
ignore_missing: true
fields:
- {field: "bb",pattern: '=',replacement: "\":\""}
- {field: "bb",pattern: ' ',replacement:""}
- {field: "bb",pattern: '\,',replacement: "\",\""}
- decode_json:
ignore_missing: true
ignore_failure: true
fields:
- field: "bb"
filters:
- {target_field: "id",type: "string",filter_keys: ["id"]}
- {target_field: "serviceId",type: "string",filter_keys: ["serviceId"]}
- {target_field: "identityId",type: "string",filter_keys: ["identityId"]}
- {target_field: "userId",type: "string",filter_keys: ["userId"]}
- {target_field: "timestamp",type: "string",filter_keys: ["timestamp"]}
- {target_field: "operationType",type: "string",filter_keys: ["operationType"]}
- drop_fields:
fields: ["time","thread","logVersion","uid","sign","traceId","cc","aa","bb"]
ignore_missing: true
- drop_event:
when:
not:
equals:
"operationLog": "LogRecord"