更新时间:2024-11-29 GMT+08:00

从零开始使用Flume

操作场景

Flume支持将采集的日志信息导入到Kafka。

前提条件

  • 已创建开启Kerberos认证的包含Flume、Kafka等组件的流式集群。
  • 已配置网络,使日志生成节点与流集群互通。
  • 已创建人机用户如test1,根据需求添加“hadoop”、“yarnviewgroup”、“hadooppmanager”和“kafkaadmin”用户组,并添加“System_administrator”和“default”角色(首次创建的用户需使用该用户登录Manager修改密码)。

使用Flume客户端

  1. 安装Flume客户端。

    可参考安装Flume客户端在日志生成节点安装Flume客户端,例如安装目录为“/opt/Flumeclient”。以下操作的客户端目录只是举例,请根据实际安装目录修改。

  2. 若集群开启Kerberos认证,需要执行以下步骤,若集群未开启Kerberos认证请跳过该步骤。

    1. 下载用户认证凭据。

      登录Manager,选择“系统 > 权限 > 用户”,搜索已创建的用户test1,选择“更多>下载认证凭据”,根据界面提示下载认证凭据到本地,解压获得“krb5.conf”和“user.keytab”文件。

    2. 2.a获取的“krb5.conf”和“user.keytab”文件拷贝到Flume客户端节点的“Flume客户端安装目录/fusioninsight-flume-Flume组件版本号/conf”目录下。
    3. 登录安装Flume客户端节点,在“Flume客户端安装目录/fusioninsight-flume-Flume组件版本号/conf”,执行以下命令创建“jaas.conf”文件并保存:

      cd /opt/FlumeClient/fusioninsight-flume-Flume组件版本号/conf/

      vi jaas.conf

      KafkaClient {
      com.sun.security.auth.module.Krb5LoginModule required
      useKeyTab=true
      keyTab="/opt/FlumeClient/fusioninsight-flume-Flume组件版本号/conf/user.keytab"
      principal="test1"
      useTicketCache=false
      storeKey=true
      debug=true;
      };
      • keyTab:用户认证文件完整路径,即2.b中保存用户认证文件的目录。
      • 首次认证配置完成后,若更换用户或认证文件,需重新配置用户认证,并重启“Flume”实例。

  3. 根据实际业务场景配置作业。

    • 部分参数可直接在Manager界面配置。
    • 在“properties.properties”文件中配置,以配置SpoolDir Source+File Channel+Kafka Sink为例。

      在安装Flume客户端的节点执行以下命令,根据实际业务需求,在Flume客户端配置文件“properties.properties”中配置并保存作业。

      vi Flume客户端安装目录/fusioninsight-flume-Flume组件版本号/conf/properties.properties

      #########################################################################################
      client.sources = static_log_source  
      client.channels = static_log_channel 
      client.sinks = kafka_sink
      #########################################################################################
      #LOG_TO_HDFS_ONLINE_1
      
      client.sources.static_log_source.type = spooldir
      client.sources.static_log_source.spoolDir = 监控目录
      client.sources.static_log_source.fileSuffix = .COMPLETED
      client.sources.static_log_source.ignorePattern = ^$
      client.sources.static_log_source.trackerDir = 传输过程中元数据存储路径
      client.sources.static_log_source.maxBlobLength = 16384
      client.sources.static_log_source.batchSize = 51200
      client.sources.static_log_source.inputCharset = UTF-8
      client.sources.static_log_source.deserializer = LINE
      client.sources.static_log_source.selector.type = replicating
      client.sources.static_log_source.fileHeaderKey = file
      client.sources.static_log_source.fileHeader = false
      client.sources.static_log_source.basenameHeader = true
      client.sources.static_log_source.basenameHeaderKey = basename
      client.sources.static_log_source.deletePolicy = never
      
      client.channels.static_log_channel.type = file
      client.channels.static_log_channel.dataDirs = 数据缓存路径,设置多个路径可提升性能,中间用逗号分开
      client.channels.static_log_channel.checkpointDir = 检查点存放路径
      client.channels.static_log_channel.maxFileSize = 2146435071
      client.channels.static_log_channel.capacity = 1000000
      client.channels.static_log_channel.transactionCapacity = 612000
      client.channels.static_log_channel.minimumRequiredSpace = 524288000
      
      client.sinks.kafka_sink.type = org.apache.flume.sink.kafka.KafkaSink
      client.sinks.kafka_sink.kafka.topic = 数据写入的topic,如flume_test
      client.sinks.kafka_sink.kafka.bootstrap.servers = XXX.XXX.XXX.XXX:kafka端口号,XXX.XXX.XXX.XXX:kafka端口号,XXX.XXX.XXX.XXX:kafka端口号
      client.sinks.kafka_sink.flumeBatchSize = 1000
      client.sinks.kafka_sink.kafka.producer.type = sync
      client.sinks.kafka_sink.kafka.security.protocol = SASL_PLAINTEXT
      client.sinks.kafka_sink.kafka.kerberos.domain.name = Kafka Domain名称,安全集群必填,如hadoop.xxx.com
      client.sinks.kafka_sink.requiredAcks = 0
      
      client.sources.static_log_source.channels = static_log_channel
      client.sinks.kafka_sink.channel = static_log_channel
      • client.sinks.kafka_sink.kafka.topic:数据写入的topic。若kafka中该topic不存在,默认情况下会自动创建该topic。
      • client.sinks.kafka_sink.kafka.bootstrap.servers:Kafkabrokers列表,多个用英文逗号分隔。默认情况下,安全集群端口21007,普通集群对应端口9092。
      • client.sinks.kafka_sink.kafka.security.protocol:安全集群为SASL_PLAINTEXT,普通集群为PLAINTEXT。
      • client.sinks.kafka_sink.kafka.kerberos.domain.name:

        普通集群无需配置此参数。安全集群对应此参数的值为Kafka集群中“kerberos.domain.name”对应的值。

        具体可到Broker实例所在节点上查看“${BIGDATA_HOME}/FusionInsight_Current/1_X_Broker/etc/server.properties”

  4. 参数配置并保存后,Flume客户端将自动加载“properties.properties”中配置的内容。当spoolDir生成新的日志文件,文件内容将发送到Kafka生产者,并支持Kafka消费者消费。