网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
企业路由器 ER
企业交换机 ESW
全球加速 GA
企业连接 EC
云原生应用网络 ANC
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
威胁检测服务 MTD
态势感知 SA
认证测试中心 CTC
边缘安全 EdgeSec
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
API网关 APIG
分布式缓存服务 DCS
多活高可用服务 MAS
事件网格 EG
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
配置审计 Config
应用身份管理服务 OneAccess
资源访问管理 RAM
组织 Organizations
资源编排服务 RFS
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
解决方案
高性能计算 HPC
SAP
混合云灾备
开天工业工作台 MIW
Haydn解决方案工厂
数字化诊断治理专家服务
云生态
云商店
合作伙伴中心
华为云开发者学堂
华为云慧通差旅
开发与运维
软件开发生产线 CodeArts
需求管理 CodeArts Req
流水线 CodeArts Pipeline
代码检查 CodeArts Check
编译构建 CodeArts Build
部署 CodeArts Deploy
测试计划 CodeArts TestPlan
制品仓库 CodeArts Artifact
移动应用测试 MobileAPPTest
CodeArts IDE Online
开源镜像站 Mirrors
性能测试 CodeArts PerfTest
应用管理与运维平台 ServiceStage
云应用引擎 CAE
开源治理服务 CodeArts Governance
华为云Astro轻应用
CodeArts IDE
Astro工作流 AstroFlow
代码托管 CodeArts Repo
漏洞管理服务 CodeArts Inspector
联接 CodeArtsLink
软件建模 CodeArts Modeling
Astro企业应用 AstroPro
CodeArts盘古助手
华为云Astro大屏应用
计算
弹性云服务器 ECS
Flexus云服务
裸金属服务器 BMS
云手机服务器 CPH
专属主机 DeH
弹性伸缩 AS
镜像服务 IMS
函数工作流 FunctionGraph
云耀云服务器(旧版)
VR云渲游平台 CVR
Huawei Cloud EulerOS
云化数据中心 CloudDC
网络
虚拟私有云 VPC
弹性公网IP EIP
弹性负载均衡 ELB
NAT网关 NAT
云专线 DC
虚拟专用网络 VPN
云连接 CC
VPC终端节点 VPCEP
企业路由器 ER
企业交换机 ESW
全球加速 GA
企业连接 EC
云原生应用网络 ANC
CDN与智能边缘
内容分发网络 CDN
智能边缘云 IEC
智能边缘平台 IEF
CloudPond云服务
安全与合规
安全技术与应用
Web应用防火墙 WAF
企业主机安全 HSS
云防火墙 CFW
安全云脑 SecMaster
DDoS防护 AAD
数据加密服务 DEW
数据库安全服务 DBSS
云堡垒机 CBH
数据安全中心 DSC
云证书管理服务 CCM
威胁检测服务 MTD
态势感知 SA
认证测试中心 CTC
边缘安全 EdgeSec
大数据
MapReduce服务 MRS
数据湖探索 DLI
表格存储服务 CloudTable
可信智能计算服务 TICS
推荐系统 RES
云搜索服务 CSS
数据可视化 DLV
数据接入服务 DIS
数据仓库服务 GaussDB(DWS)
数据治理中心 DataArts Studio
湖仓构建 LakeFormation
智能数据洞察 DataArts Insight
应用中间件
微服务引擎 CSE
分布式消息服务Kafka版
分布式消息服务RabbitMQ版
分布式消息服务RocketMQ版
API网关 APIG
分布式缓存服务 DCS
多活高可用服务 MAS
事件网格 EG
开天aPaaS
应用平台 AppStage
开天企业工作台 MSSE
开天集成工作台 MSSI
API中心 API Hub
云消息服务 KooMessage
交换数据空间 EDS
云地图服务 KooMap
云手机服务 KooPhone
组织成员账号 OrgID
云空间服务 KooDrive
管理与监管
统一身份认证服务 IAM
消息通知服务 SMN
云监控服务 CES
应用运维管理 AOM
应用性能管理 APM
云日志服务 LTS
云审计服务 CTS
标签管理服务 TMS
配置审计 Config
应用身份管理服务 OneAccess
资源访问管理 RAM
组织 Organizations
资源编排服务 RFS
优化顾问 OA
IAM 身份中心
云运维中心 COC
资源治理中心 RGC
区块链
区块链服务 BCS
数字资产链 DAC
华为云区块链引擎服务 HBS
解决方案
高性能计算 HPC
SAP
混合云灾备
开天工业工作台 MIW
Haydn解决方案工厂
数字化诊断治理专家服务
价格
成本优化最佳实践
专属云商业逻辑
云生态
云商店
合作伙伴中心
华为云开发者学堂
华为云慧通差旅
其他
管理控制台
消息中心
产品价格详情
系统权限
客户关联华为云合作伙伴须知
公共问题
宽限期保留期
奖励推广计划
活动
云服务信任体系能力说明
开发与运维
软件开发生产线 CodeArts
需求管理 CodeArts Req
流水线 CodeArts Pipeline
代码检查 CodeArts Check
编译构建 CodeArts Build
部署 CodeArts Deploy
测试计划 CodeArts TestPlan
制品仓库 CodeArts Artifact
移动应用测试 MobileAPPTest
CodeArts IDE Online
开源镜像站 Mirrors
性能测试 CodeArts PerfTest
应用管理与运维平台 ServiceStage
云应用引擎 CAE
开源治理服务 CodeArts Governance
华为云Astro轻应用
CodeArts IDE
Astro工作流 AstroFlow
代码托管 CodeArts Repo
漏洞管理服务 CodeArts Inspector
联接 CodeArtsLink
软件建模 CodeArts Modeling
Astro企业应用 AstroPro
CodeArts盘古助手
华为云Astro大屏应用
存储
对象存储服务 OBS
云硬盘 EVS
云备份 CBR
高性能弹性文件服务 SFS Turbo
弹性文件服务 SFS
存储容灾服务 SDRS
云硬盘备份 VBS
云服务器备份 CSBS
数据快递服务 DES
云存储网关 CSG
专属分布式存储服务 DSS
数据工坊 DWR
地图数据 MapDS
键值存储服务 KVS
容器
云容器引擎 CCE
云容器实例 CCI
容器镜像服务 SWR
云原生服务中心 OSC
应用服务网格 ASM
华为云UCS
数据库
云数据库 RDS
数据复制服务 DRS
文档数据库服务 DDS
分布式数据库中间件 DDM
云数据库 GaussDB
云数据库 GeminiDB
数据管理服务 DAS
数据库和应用迁移 UGO
云数据库 TaurusDB
人工智能
AI开发平台ModelArts
华为HiLens
图引擎服务 GES
图像识别 Image
文字识别 OCR
自然语言处理 NLP
内容审核 Moderation
图像搜索 ImageSearch
医疗智能体 EIHealth
企业级AI应用开发专业套件 ModelArts Pro
人脸识别服务 FRS
对话机器人服务 CBS
语音交互服务 SIS
人证核身服务 IVS
视频智能分析服务 VIAS
城市智能体
自动驾驶云服务 Octopus
盘古大模型 PanguLargeModels
IoT物联网
设备接入 IoTDA
全球SIM联接 GSL
IoT数据分析 IoTA
路网数字化服务 DRIS
IoT边缘 IoTEdge
设备发放 IoTDP
企业应用
域名注册服务 Domains
云解析服务 DNS
企业门户 EWP
ICP备案
商标注册
华为云WeLink
华为云会议 Meeting
隐私保护通话 PrivateNumber
语音通话 VoiceCall
消息&短信 MSGSMS
云管理网络
SD-WAN 云服务
边缘数据中心管理 EDCM
云桌面 Workspace
应用与数据集成平台 ROMA Connect
ROMA资产中心 ROMA Exchange
API全生命周期管理 ROMA API
政企自服务管理 ESM
视频
实时音视频 SparkRTC
视频直播 Live
视频点播 VOD
媒体处理 MPC
视频接入服务 VIS
数字内容生产线 MetaStudio
迁移
主机迁移服务 SMS
对象存储迁移服务 OMS
云数据迁移 CDM
迁移中心 MGC
专属云
专属计算集群 DCC
开发者工具
SDK开发指南
API签名指南
DevStar
华为云命令行工具服务 KooCLI
Huawei Cloud Toolkit
CodeArts API
云化转型
云架构中心
云采用框架
用户服务
账号中心
费用中心
成本中心
资源中心
企业管理
工单管理
客户运营能力
国际站常见问题
支持计划
专业服务
合作伙伴支持计划
我的凭证
华为云公共事业服务云平台
工业软件
工业数字模型驱动引擎
硬件开发工具链平台云服务
工业数据转换引擎云服务

Flume业务配置指南

更新时间:2024-10-22 GMT+08:00
分享

本章节适用于MRS 3.x及之后版本。

该操作指导用户完成Flume常用业务的配置。其他一些不太常用的Source、Channel、Sink的配置请参考Flume社区提供的用户手册(http://flume.apache.org/releases/1.9.0.html)。

说明:
  • 各个表格中所示参数,黑体加粗的参数为必选参数。
  • Sink的BatchSize参数必须小于Channel的transactionCapacity。
  • 集群Flume配置工具界面篇幅有限,Source、Channel、Sink只展示部分参数,详细请参考如下常用配置。
  • 集群Flume配置工具界面上所展示Customer Source、Customer Channel及Customer Sink需要用户根据自己开发的代码来进行配置,下述常用配置不再展示。

常用Source配置

  • Avro Source

    Avro Source监测Avro端口,接收外部Avro客户端数据并放入配置的Channel中。常用配置如下表所示:

    表1 Avro Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    avro

    avro source的类型,必须为avro。

    bind

    -

    监测主机名/IP。

    port

    -

    绑定监测端口,该端口需未被占用。

    threads

    -

    source工作的最大线程数。

    compression-type

    none

    消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。

    compression-level

    6

    数据压缩级别(1-9),数值越高,压缩率越高。

    ssl

    false

    是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    truststore-type

    JKS

    Java信任库类型,“JKS”或“PKCS12”。

    说明:

    JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    keystore-type

    JKS

    ssl启用后密钥存储类型,“JKS”或“PKCS12”。

    说明:

    JKS的密钥库和私钥用不同的密码进行保护,而PKCS12的密钥库和私钥用相同密码进行保护。

    keystore

    -

    ssl启用后密钥存储文件路径,开启ssl后,该参数必填。

    keystore-password

    -

    ssl启用后密钥存储密码,开启ssl后,该参数必填。

    trust-all-certs

    false

    是否关闭SSL server证书检查。设置为“true”时将不会检查远端source的SSL server证书,不建议在生产中使用。

    exclude-protocols

    SSLv3

    排除的协议列表,用空格分开。默认排除SSLv3协议。

    ipFilter

    false

    是否开启ip过滤。

    ipFilter.rules

    -

    定义N网络的ipFilters,多个主机或IP地址用逗号分。ipFilter设置为“true”时,配置规则有允许和禁止两种,配置格式如下:

    ipFilterRules=allow:ip:127.*, allow:name:localhost, deny:ip:*

  • SpoolDir Source

    Spool Dir Source监控并传输目录下新增的文件,可实现实时数据传输。常用配置如下表所示:

    表2 Spooling Directory Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    spooldir

    spooling source的类型,必须设置为spooldir。

    spoolDir

    -

    Spooldir source的监控目录,flume运行用户需要对该目录具有可读可写可执行权限。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒。

    fileSuffix

    .COMPLETED

    文件传输完成后添加的后缀。

    deletePolicy

    never

    文件传输完成后源文件删除策略,never或immediate。“never”表示不删除已完成传输的源文件,“immediate”表示传输完成后立刻删除源文件。

    ignorePattern

    ^$

    忽略文件的正则表达式表示。默认为“^$”,表示忽略空格。

    includePattern

    ^.*$

    包含文件的正则表达式表示。可以与ignorePattern同时使用,如果一个文件既满足ignorePattern也满足includePattern,则该文件会被忽略。另外,以“.”开头的文件不会被过滤。

    trackerDir

    .flumespool

    传输过程中元数据存储路径。

    batchSize

    1000

    批次写入Channel的Event数量。

    decodeErrorPolicy

    FAIL

    编码错误策略。

    说明:

    如果文件中有编码错误,请配置“decodeErrorPolicy”为“REPLACE”或“IGNORE”,Flume遇到编码错误将跳过编码错误,继续采集后续日志。

    deserializer

    LINE

    文件解析器,值为“LINE”“BufferedLine”

    • 配置为“LINE”时,对从文件读取的字符逐个转码。
    • 配置为“BufferedLine”时,对文件读取的一行或多行的字符进行批量转码,性能更优。

    deserializer.maxLineLength

    2048

    按行解析最大长度。

    deserializer.maxBatchLine

    1

    按行解析最多行数,如果行数设置为多行,maxLineLength也应该设置为相应的倍数。

    说明:

    用户设置Interceptor时,需要考虑多行合并后的场景,否则会造成数据丢失。如果Interceptor无法处理多行合并场景,请将该配置设置为1。

    selector.type

    replicating

    选择器类型,“replicating”或“multiplexing”。“replicating”表示将数据复制多份,分别传递给每一个channel,每个channel接收到的数据都是相同的,而“multiplexing”表示根据event中header的value来选择特定的channel,每个channel中的数据是不同的。

    interceptors

    -

    拦截器。多个拦截器用空格分开。

    inputCharset

    UTF-8

    读取文件的编码格式。须与读取数据源文件编码格式相同,否则字符解析可能会出错。

    fileHeader

    false

    是否把文件名(包含路径)添加到event的header中。

    fileHeaderKey

    -

    设置header中数据存储结构为<key,value>模式,需要fileHeaderKey与fileHeader配合使用。若fileHeader设置为true,可参考如下示例。

    示例:将fileHeaderKey定义为file,当读取到文件名为/root/a.txt的内容时,header中以file=/root/a.txt的形式存在。

    basenameHeader

    false

    是否把文件名(不包含路径)添加到event的header中。

    basenameHeaderKey

    -

    设置header中数据存储结构为<key,value>模式,需要basenameHeaderKey与basenameHeader配合使用。若basenameHeader设置为true,可参考如下示例。

    示例:将basenameHeaderKey定义为file,当读取到文件名为a.txt的内容时,header中以file=a.txt的形式存在。

    pollDelay

    500

    轮询监控目录下新文件时的时延。单位:毫秒。

    recursiveDirectorySearch

    false

    是否监控配置的目录下子目录中的新文件。

    consumeOrder

    oldest

    监控目录下文件的消耗次序。如果配置为oldest或者youngest,会根据监控目录下文件的最后修改时间来决定,当目录下有大量文件时,会消耗较长时间去寻找oldest或者youngest的文件。需要注意的是,如果配置为random,创建比较早的文件有可能长时间未被读取。如果配置为oldest或者youngest,那么进程会需要较多时间来查找最新的或最旧的文件。可选值:random,youngest,oldest。

    maxBackoff

    4000

    当Channel满了以后,尝试再次去写Channel所等待的最大时间。超过这个时间,则会发生异常。对应的Source会以一个较小的时间开始,然后每尝试一次,该时间数字指数增长直到达到当前指定的值,如果还不能成功写入,则认为失败。时间单位:秒。

    emptyFileEvent

    true

    是否采集空文件信息发送到Sink端,默认值为true,表示将空文件信息发送到Sink端。该参数只对HDFS Sink有效,其他Sink该参数无效。以HDFS Sink为例,当参数为true时,如果spoolDir路径下存在空文件,那么HDFS的hdfs.path路径下就会创建一个同名的空文件。

    说明:

    SpoolDir Source在按行读取过程中会忽略掉每一个event的最后一个换行符,该换行符所占用的数据量指标不会被Flume统计。

  • Kafka Source

    Kafka Source从Kafka的topic中消费数据,可以设置多个Source消费同一个topic的数据,每个Source会消费topic的不同partitions。常用配置如下表所示:

    表3 Kafka Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    org.apache.flume.source.kafka.KafkaSource

    kafka source的类型,必须设置为org.apache.flume.source.kafka.KafkaSource。

    kafka.bootstrap.servers

    -

    Kafka的bootstrap地址端口列表。如果集群已安装Kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    kafka.topics

    -

    订阅的Kafka topic列表,用逗号分隔。

    kafka.topics.regex

    -

    符合正则表达式的topic会被订阅,优先级高于“kafka.topics”,如果存在将覆盖“kafka.topics”。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒。

    nodatatime

    0(不开启)

    告警阈值,从Kafka中订阅不到数据的时长超过阈值时发送告警,单位:秒。该参数可在配置文件properties.properties进行设置。

    batchSize

    1000

    批次写入Channel的Event数量。

    batchDurationMillis

    1000

    批次消费topic数据的最大时长,单位:ms。

    keepTopicInHeader

    false

    是否在Event Header中保存topic。设置为true,则Kafka Sink配置的topic将无效。

    setTopicHeader

    true

    当设置为true时,会将“topicHeader”中定义的topic名称存储到Header中。

    topicHeader

    topic

    当setTopicHeader属性设置为true,此参数用于定义存储接收的topic名称。如果与Kafka Sink的topicHeader属性结合使用,应该注意,避免将消息循环发送到同一主题。

    useFlumeEventFormat

    false

    默认情况下,event会以字节的形式从kafka topic传递到event的body体中。设置为true,则会以Flume的Avro二进制格式来读取Event。与KafkaSink或KakfaChannel 中同名的parseAsFlumeEvent参数一起使用时,会保留从数据源产生的任何设定的Header。

    keepPartitionInHeader

    false

    是否在Event Header中保存partitionID。设置为true,则Kafka Sink将写入对应的Partition。

    kafka.consumer.group.id

    flume

    Kafka消费组ID。多个源或代理中设置相同的ID表示它们是同一个consumer group。

    kafka.security.protocol

    SASL_PLAINTEXT

    Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    Other Kafka Consumer Properties

    -

    其他Kafka配置,可以接受任意Kafka支持的消费配置,配置需要加前缀“kafka.”。

  • Taildir Source

    Taildir Source监控目录下文件的变化并自动读取文件内容,可实现实时数据传输,常用配置如下表所示:

    表4 Taildir Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    TAILDIR

    taildir source的类型,必须为TAILDIR。

    filegroups

    -

    设置采集文件目录分组名字,分组名字中间使用空格间隔。

    filegroups.<filegroupName>.parentDir

    -

    父目录,需要配置为绝对路径。

    filegroups.<filegroupName>.filePattern

    -

    相对父目录的文件路径,可以包含目录,支持正则表达式,须与父目录联合使用。

    positionFile

    -

    传输过程中元数据存储路径。

    headers.<filegroupName>.<headerKey>

    -

    设置某一个分组采集数据时event中的key-value值。

    byteOffsetHeader

    false

    是否在每一个event头中携带该event在源文件中的位置信息。设置为true,则该信息保存在byteoffset变量中。

    maxBatchCount

    Long.MAX_VALUE

    控制从一个文件中连续读取的最大批次。如果监控目录会一直读取多个文件,且其中一个文件以非常快的速率在写入,那么其他文件可能会无法处理。因为高速写入的这个文件会陷入无限读取的循环中。这种情况下,应该降低此值。

    skipToEnd

    false

    Flume在重启后是否直接定位到文件最新的位置处读取最新的数据。设置为true,则重启后直接定位到文件最新位置读取最新数据。

    idleTimeout

    120000

    设置读取文件的空闲时间,单位:毫秒,如果在该时间内文件内容没有变更,关闭掉该文件,关闭后如果该文件有数据写入,重新打开并读取数据。

    writePosInterval

    3000

    设置将元数据写入到文件的周期,单位:毫秒。

    batchSize

    1000

    批次写入Channel的Event数量。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Source,单位:秒

    fileHeader

    false

    是否把文件名(包含路径)添加到event的header中。

    fileHeaderKey

    file

    设置header中数据存储结构为<key,value>模式,需要fileHeaderKey与fileHeader配合使用。若fileHeader设置为true,可参考如下示例。

    示例:将fileHeaderKey定义为file,当读取到文件名为/root/a.txt的内容时,header中以file=/root/a.txt的形式存在。

  • Http Source

    Http Source接收外部HTTP客户端发送过来的数据,并放入配置的Channel中,常用配置如下表所示:

    表5 Http Source常用配置

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    http

    http source的类型,必须为http。

    bind

    -

    监测主机名/IP。

    port

    -

    绑定监测端口,该端口需未被占用。

    handler

    org.apache.flume.source.http.JSONHandler

    http请求的消息解析方式,支持Json格式解析(org.apache.flume.source.http.JSONHandler)和二进制Blob块解析(org.apache.flume.sink.solr.morphline.BlobHandler)。

    handler.*

    -

    设置handler的参数。

    exclude-protocols

    SSLv3

    排除的协议列表,用空格分开。默认排除SSLv3协议。

    include-cipher-suites

    -

    包含的协议列表,用空格分开。如果设置为空,则默认支持所有协议。

    enableSSL

    false

    http协议是否启用SSL。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    keystore-type

    JKS

    Keystore类型,可以为JKS或者PKCS12。

    keystore

    -

    http启用SSL后设置keystore的路径。

    keystorePassword

    -

    http启用SSL后设置keystore的密码。

  • Thrift Source

    Thrift Source监测thrift端口,接收外部Thrift客户端数据并放入配置的Channel中。常用配置如下表所示:

    参数

    默认值

    描述

    channels

    -

    与之相连的channel,可以配置多个。

    type

    thrift

    thrift source的类型,必须设置为thrift。

    bind

    -

    监测主机名/IP。

    port

    -

    绑定监测端口,该端口需未被占用。

    threads

    -

    允许运行的最大的worker线程数目。

    kerberos

    false

    是否启用Kerberos认证。

    agent-keytab

    -

    服务端使用的keytab文件地址,必须使用机机账号。建议使用Flume服务安装目录下flume/conf/flume_server.keytab。

    agent-principal

    -

    服务端使用的安全用户的Principal,必须使用机机账户。建议使用Flume服务默认用户flume_server/hadoop.<系统域名>@<系统域名>

    说明:

    “flume_server/hadoop.<系统域名>为用户名,用户的用户名所包含的系统域名所有字母为小写。例如“本端域”参数为“9427068F-6EFA-4833-B43E-60CB641E5B6C.COM”,用户名为“flume_server/hadoop.9427068f-6efa-4833-b43e-60cb641e5b6c.com”。

    compression-type

    none

    消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。

    ssl

    false

    是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    keystore-type

    JKS

    SSL启用后密钥存储类型。

    keystore

    -

    SSL启用后密钥存储文件路径,开启SSL后,该参数必填。

    keystore-password

    -

    SSL启用后密钥存储密码,开启ssl后,该参数必填。

常用Channel配置

  • Memory Channel

    Memory Channel使用内存作为缓存区,Events存放在内存队列中。常用配置如下表所示:

    表6 Memory Channel常用配置

    参数

    默认值

    描述

    type

    -

    memory channel的类型,必须设置为memory。

    capacity

    10000

    缓存在channel中的最大Event数。

    transactionCapacity

    1000

    每次存取的最大Event数。

    说明:
    • 此参数值需要大于source和sink的batchSize。
    • 事务缓存容量必须小于或等于Channel缓存容量。

    channelfullcount

    10

    channel full次数,达到该次数后发送告警。

    keep-alive

    3

    当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。

    byteCapacity

    JVM最大内存的80%

    channel中最多能容纳所有event body的总字节数,默认是 JVM最大可用内存(-Xmx )的80%,单位:bytes。

    byteCapacityBufferPercentage

    20

    channel中字节容量百分比(%)。

  • File Channel

    File Channel使用本地磁盘作为缓存区,Events存放在设置的dataDirs配置项文件夹中。常用配置如下表所示:

    表7 File Channel常用配置

    参数

    默认值

    描述

    type

    -

    file channel的类型,必须设置为file。

    checkpointDir

    ${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/checkpoint

    说明:

    此路径随自定义数据路径变更。

    检查点存放路径。

    dataDirs

    ${BIGDATA_DATA_HOME}/hadoop/data1~N/flume/data

    说明:

    此路径随自定义数据路径变更。

    数据缓存路径,设置多个路径可提升性能,中间用逗号分开。

    maxFileSize

    2146435071

    单个缓存文件的最大值,单位:bytes。

    minimumRequiredSpace

    524288000

    缓冲区空闲空间最小值,单位:bytes。

    capacity

    1000000

    缓存在channel中的最大Event数。

    transactionCapacity

    10000

    每次存取的最大Event数。

    说明:
    • 此参数值需要大于source和sink的batchSize。
    • 事务缓存容量必须小于或等于Channel缓存容量。

    channelfullcount

    10

    channel full次数,达到该次数后发送告警。

    useDualCheckpoints

    false

    是否备份检查点。设置为“true”时,必须设置backupCheckpointDir的参数值。

    backupCheckpointDir

    -

    备份检查点路径。

    checkpointInterval

    30000

    检查点间隔时间,单位:秒。

    keep-alive

    3

    当事务缓存或Channel缓存满时,Put、Take线程等待时间。单位:秒。

    use-log-replay-v1

    false

    是否启用旧的回复逻辑。

    use-fast-replay

    false

    是否使用队列回复。

    checkpointOnClose

    true

    channel关闭时是否创建检查点。

  • Memory File Channel

    Memory File Channel同时使用内存和本地磁盘作为缓存区,消息可持久化,性能优于File Channel,接近Memory Channel的性能。此Channel目前处于试验阶段,可靠性不够高,不建议在生产环境使用。常用配置如下表所示:

    表8 Memory File Channel常用配置

    参数

    默认值

    描述

    type

    org.apache.flume.channel.MemoryFileChannel

    memory file channel的类型,必须设置为“org.apache.flume.channel.MemoryFileChannel”

    capacity

    50000

    Channel缓存容量:缓存在Channel中的最大Event数。

    transactionCapacity

    5000

    事务缓存容量:一次事务能处理的最大Event数。

    说明:
    • 此参数值需要大于source和sink的batchSize。
    • 事务缓存容量必须小于或等于Channel缓存容量。

    subqueueByteCapacity

    20971520

    每个subqueue最多保存多少byte的Event,单位:byte。

    Memory File Channel采用queue和subqueue两级缓存,event保存在subqueue,subqueue保存在queue。

    subqueue能保存多少event,由“subqueueCapacity”“subqueueInterval”两个参数决定,“subqueueCapacity”限制subqueue内的Event总容量,“subqueueInterval”限制subqueue保存Event的时长,只有subqueue达到“subqueueCapacity”“subqueueInterval”上限时,subqueue内的Event才会发往目的地。

    说明:

    “subqueueByteCapacity”必须大于一个batchsize内的Event总容量。

    subqueueInterval

    2000

    每个subqueue最多保存一段多长时间的Event,单位:毫秒。

    keep-alive

    3

    当事务缓存或Channel缓存满时,Put、Take线程等待时间。

    单位:秒。

    dataDir

    -

    缓存本地文件存储目录。

    byteCapacity

    JVM最大内存的80%

    Channel缓存容量。

    单位:bytes。

    compression-type

    None

    消息压缩格式:“none”或“deflate”。“none”表示不压缩,“deflate”表示压缩。

    channelfullcount

    10

    channel full次数,达到该次数后发送告警。

    Memory File Channel配置样例:

    server.channels.c1.type = org.apache.flume.channel.MemoryFileChannel
    server.channels.c1.dataDir = /opt/flume/mfdata
    server.channels.c1.subqueueByteCapacity = 20971520
    server.channels.c1.subqueueInterval=2000
    server.channels.c1.capacity = 500000
    server.channels.c1.transactionCapacity = 40000
  • Kafka Channel
    Kafka Channel使用Kafka集群缓存数据,Kafka提供高可用、多副本,以防Flume或Kafka Broker崩溃,Channel中的数据会立即被Sink消费。
    表9 Kafka channel 常用配置

    Parameter

    Default Value

    Description

    type

    -

    kafka channel的类型,必须设置为 “org.apache.flume.channel.kafka.KafkaChannel”

    kafka.bootstrap.servers

    -

    Kafka的bootstrap地址端口列表。

    如果集群已安装Kafka并且配置已经同步,则服务端可以不配置此项,默认值为Kafka集群中所有的broker列表。客户端必须配置该项,多个值用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    kafka.topic

    flume-channel

    channel用来缓存数据的topic。

    kafka.consumer.group.id

    flume

    从kafka中获取数据的组标识,此参数不能为空。

    parseAsFlumeEvent

    true

    是否解析为Flume event。

    migrateZookeeperOffsets

    true

    当Kafka没有存储offset时,是否从ZooKeeper中查找,并提交到Kafka。

    kafka.consumer.auto.offset.reset

    latest

    当没有offset记录时从什么位置消费,可选为“earliest”、“latest”或“none”。“earliest”表示将offset重置为初始点,“latest”表示将offset置为最新位置点,“none”表示若没有offset则发生异常。

    kafka.producer.security.protocol

    SASL_PLAINTEXT

    Kafka生产安全协议。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    说明:

    若该参数没有显示,请单击弹窗左下角的"+"显示全部参数。

    kafka.consumer.security.protocol

    SASL_PLAINTEXT

    同上,但用于消费。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    pollTimeout

    500

    consumer调用poll()函数能接受的最大超时时间,单位:毫秒。

    ignoreLongMessage

    false

    是否丢弃超大消息。

    messageMaxLength

    1000012

    Flume写入Kafka的消息的最大长度。

常用Sink配置

  • HDFS Sink

    HDFS Sink将数据写入Hadoop分布式文件系统(HDFS)。常用配置如下表所示:

    表10 HDFS Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    hdfs

    hdfs sink的类型,必须设置为hdfs。

    hdfs.path

    -

    HDFS上数据存储路径,必须以“hdfs://hacluster/”开头。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。

    hdfs.inUseSuffix

    .tmp

    正在写入的hdfs文件后缀。

    hdfs.rollInterval

    30

    按时间滚动文件,单位:秒,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。

    hdfs.rollSize

    1024

    按大小滚动文件,单位:bytes,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。

    hdfs.rollCount

    10

    按Event个数滚动文件,同时需将“hdfs.fileCloseByEndEvent”设置为“false”。

    说明:

    参数“rollInterval”、“rollSize”和“rollCount”可同时配置,三个参数采取优先原则,哪个参数值先满足,优先按照哪个参数进行压缩。

    hdfs.idleTimeout

    0

    自动关闭空闲文件超时时间,单位:秒。

    hdfs.batchSize

    1000

    批次写入HDFS的Event个数。

    hdfs.kerberosPrincipal

    -

    认证HDFS的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。

    hdfs.kerberosKeytab

    -

    认证HDFS的Kerberos keytab,普通模式集群不配置,安全模式集群中,用户必须对jaas.cof文件中的keyTab路径有访问权限。

    hdfs.fileCloseByEndEvent

    true

    收到源文件的最后一个Event时是否关闭hdfs文件。

    hdfs.batchCallTimeout

    -

    批次写入HDFS超时控制时间,单位:毫秒。

    当不配置此参数时,对每个Event写入HDFS进行超时控制。当“hdfs.batchSize”大于0时,配置此参数可以提升写入HDFS性能。

    说明:

    “hdfs.batchCallTimeout”设置多长时间需要考虑“hdfs.batchSize”的大小,“hdfs.batchSize”越大,“hdfs.batchCallTimeout”也要调整更长时间,设置过短时间容易导致写HDFS失败。

    serializer.appendNewline

    true

    将一个Event写入HDFS后是否追加换行符('\n'),如果追加该换行符,该换行符所占用的数据量指标不会被HDFS Sink统计。

    hdfs.filePrefix

    over_%{basename}

    数据写入hdfs后文件名的前缀。

    hdfs.fileSuffix

    -

    数据写入hdfs后文件名的后缀。

    hdfs.inUsePrefix

    -

    正在写入的hdfs文件前缀。

    hdfs.fileType

    DataStream

    hdfs文件格式,包括“SequenceFile”、“DataStream”以及“CompressedStream”。

    说明:

    “SequenceFile”和“DataStream”不压缩输出文件,不能设置参数“codeC”,“CompressedStream”压缩输出文件,必须设置“codeC”参数值配合使用。

    hdfs.codeC

    -

    文件压缩格式,包括gzip、bzip2、lzo、lzop、snappy。

    hdfs.maxOpenFiles

    5000

    最大允许打开的hdfs文件数,当打开的文件数达到该值时,最早打开的文件将会被关闭。

    hdfs.writeFormat

    Writable

    文件写入格式,“Writable”或者“Text”。

    hdfs.callTimeout

    10000

    写入HDFS超时控制时间,单位:毫秒。

    hdfs.threadsPoolSize

    -

    每个HDFS sink用于HDFS io操作的线程数。

    hdfs.rollTimerPoolSize

    -

    每个HDFS sink用于调度定时文件滚动的线程数。

    hdfs.round

    false

    时间戳是否四舍五入。若设置为true,则会影响所有基于时间的转义序列(%t除外)。

    hdfs.roundUnit

    second

    时间戳四舍五入单位,可选为“second”、“minute”或“hour”,分别对应为秒、分钟和小时。

    hdfs.useLocalTimeStamp

    true

    是否启用本地时间戳,建议设置为“true”。

    hdfs.closeTries

    0

    hdfs sink尝试关闭重命名文件的最大次数。默认为0表示sink会一直尝试重命名,直至重命名成功。

    hdfs.retryInterval

    180

    尝试关闭hdfs文件的时间间隔,单位:秒。

    说明:

    每个关闭请求都会有多个RPC往返Namenode,因此设置的太低可能导致Namenode超负荷。如果设置0,如果第一次尝试失败的话,该Sink将不会尝试关闭文件,并且把文件打开,或者用“.tmp”作为扩展名。

    hdfs.failcount

    10

    数据写入hdfs失败的次数。该参数作为sink写入hdfs失败次数的阈值,当超过该阈值后上报数据传输异常告警。

  • Avro Sink

    Avro Sink把events转化为Avro events并发送到配置的主机的监测端口。常用配置如下表所示:

    表11 Avro Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    -

    avro sink的类型,必须设置为avro。

    hostname

    -

    绑定的主机名/IP。

    port

    -

    监测端口,该端口需未被占用。

    batch-size

    1000

    批次发送的Event个数。

    client.type

    DEFAULT

    客户端实例类型,根据所配置的模型实际使用到的通信协议设置。该值可选值包括:

    • DEFAULT,返回AvroRPC类型的客户端实例。
    • OTHER,返回NULL。
    • THRIFT,返回Thrift RPC类型的客户端实例。
    • DEFAULT_LOADBALANCING, 返回LoadBalancing RPC 客户端实例。
    • DEFAULT_FAILOVER, 返回Failover RPC 客户端实例。

    ssl

    false

    是否使用SSL加密。设置为true时还必须指定“密钥(keystore)”和“密钥存储密码(keystore-password)”。

    truststore-type

    JKS

    Java信任库类型,“JKS”或“PKCS12”。

    说明:

    JKS的密钥库和私钥采用不同的密码进行保护,而PKCS12的密钥库和私钥采用相同密码进行保护。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    keystore-type

    JKS

    ssl启用后密钥存储类型。

    keystore

    -

    ssl启用后密钥存储文件路径,开启ssl后,该参数必填。

    keystore-password

    -

    ssl启用后密钥存储密码,开启ssl后,该参数必填。

    connect-timeout

    20000

    第一次连接的超时时间,单位:毫秒。

    request-timeout

    20000

    第一次请求后一次请求的最大超时时间,单位:毫秒。

    reset-connection-interval

    0

    一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。

    compression-type

    none

    批数据压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。该值必须与AvroSource的compression-type匹配。

    compression-level

    6

    批数据压缩级别(1-9),数值越高,压缩率越高。

    exclude-protocols

    SSLv3

    排除的协议列表,用空格分开。默认排除SSLv3协议。

  • HBase Sink

    HBase Sink将数据写入到HBase中。常用配置如下表所示:

    表12 HBase Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    -

    hbase sink的类型,必须设置为hbase。

    table

    -

    HBase表名称。

    columnFamily

    -

    HBase列族。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。

    batchSize

    1000

    批次写入HBase的Event个数。

    kerberosPrincipal

    -

    认证HBase的Kerberos principal,普通模式集群不配置,安全模式集群必须配置。

    kerberosKeytab

    -

    认证HBase的Kerberos keytab,普通模式集群不配置,安全模式集群中,flume运行用户必须对jaas.cof文件中的keyTab路径有访问权限。

    coalesceIncrements

    true

    是否在同一个处理批次中,合并对同一个hbase cell多个操作。设置为true有利于提高性能。

  • Kafka Sink

    Kafka Sink将数据写入到Kafka中。常用配置如下表所示:

    表13 Kafka Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    -

    kafka sink的类型,必须设置为org.apache.flume.sink.kafka.KafkaSink。

    kafka.bootstrap.servers

    -

    Kafka 的bootstrap 地址端口列表。如果集群安装有kafka并且配置已经同步,服务端可以不配置此项,默认值为Kafka集群中所有的broker列表,客户端必须配置该项,多个用逗号分隔。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    monTime

    0(不开启)

    线程监控阈值,更新时间超过阈值后,重新启动该Sink,单位:秒。

    kafka.producer.acks

    1

    必须收到多少个replicas的确认信息才认为写入成功。0表示不需要接收确认信息,1表示只等待leader的确认信息。-1表示等待所有的relicas的确认信息。设置为-1,在某些leader失败的场景中可以避免数据丢失。

    kafka.topic

    -

    数据写入的topic,必须填写。

    flumeBatchSize

    1000

    批次写入Kafka的Event个数。

    kafka.security.protocol

    SASL_PLAINTEXT

    Kafka安全协议,普通模式集群下须配置为“PLAINTEXT”。端口和安全协议的匹配规则必须为:21007匹配安全模式(SASL_PLAINTEXT),9092匹配普通模式(PLAINTEXT)。

    ignoreLongMessage

    false

    是否丢弃超大消息的开关。

    messageMaxLength

    1000012

    Flume写入Kafka的消息的最大长度。

    defaultPartitionId

    -

    用于指定channel中的events被传输到哪一个Kafka partition ID ,此值会被partitionIdHeader覆盖。默认情况下,如果此参数不设置,会由Kafka Producer's partitioner 进行events分发(可以通过指定key或者kafka.partitioner.class自定义的partitioner)。

    partitionIdHeader

    -

    设置时,对应的Sink 将从Event 的Header中获取使用此属性的值命名的字段的值,并将消息发送到主题的指定分区。 如果该值无对应的有效分区,则会发生EventDeliveryException。 如果Header 值已经存在,则此设置将覆盖参数defaultPartitionId。

    Other Kafka Producer Properties

    -

    其他Kafka配置,可以接受任意Kafka支持的生产配置,配置需要加前缀 .kafka。

  • Thrift Sink

    Thrift Sink把events转化为Thrift events并发送到配置的主机的监测端口。常用配置如下表所示:

    表14 Thrift Sink常用配置

    参数

    默认值

    描述

    channel

    -

    与之相连的channel。

    type

    thrift

    thrift sink的类型,必须设置为thrift。

    hostname

    -

    绑定的主机名/IP。

    port

    -

    监测端口,该端口需未被占用。

    batch-size

    1000

    批次发送的Event个数。

    connect-timeout

    20000

    第一次连接的超时时间,单位:毫秒。

    request-timeout

    20000

    第一次请求后一次请求的最大超时时间,单位:毫秒。

    kerberos

    false

    是否启用Kerberos认证。

    client-keytab

    -

    客户端使用的keytab文件地址,flume运行用户必须对认证文件具有访问权限。

    client-principal

    -

    客户端使用的安全用户的Principal。

    server-principal

    -

    服务端使用的安全用户的Principal。

    compression-type

    none

    Flume发送数据的压缩类型,“none”或“deflate”,“none”表示不压缩,“deflate”表示压缩。

    maxConnections

    5

    Flume发送数据时的最大连接池大小。

    ssl

    false

    是否使用SSL加密。

    truststore-type

    JKS

    Java信任库类型。

    truststore

    -

    Java信任库文件。

    truststore-password

    -

    Java信任库密码。

    reset-connection-interval

    0

    一次断开连接后,等待多少时间后进行重新连接,单位:秒。默认为0表示不断尝试。

注意事项

  • Flume可靠性保障措施有哪些?
    • Source&Channel、Channel&Sink之间的事务机制。
    • Sink Processor支持配置failover、load_blance机制,例如负载均衡示例如下,详细参考http://flume.apache.org/releases/1.9.0.html
      server.sinkgroups=g1
      server.sinkgroups.g1.sinks=k1 k2
      server.sinkgroups.g1.processor.type=load_balance
      server.sinkgroups.g1.processor.backoff=true
      server.sinkgroups.g1.processor.selector=random
  • Flume多agent聚合级联时的注意事项?
    • 级联时需要使用Avro或者Thrift协议进行级联。
    • 聚合端存在多个节点时,连接配置尽量配置均衡,不要聚合到单节点上。
提示

您即将访问非华为云网站,请注意账号财产安全

文档反馈

文档反馈

意见反馈

0/500

标记内容

同时提交标记内容