更新时间:2024-03-12 GMT+08:00

Flume对接安全Hive指导

操作场景

使用Flume对接集群中的Hive(3.1.0版本)。

本章节适用于MRS 3.x及之后版本。

前置条件

集群正确安装了Flume服务和Hive服务,且服务正常无告警异常。

操作步骤

  1. 使用omm用户将如下jar包导入到需要测试的Flume实例的lib目录下(客户端/服务端),列表如下:

    • antlr-2.7.7.jar
    • antlr-runtime-3.4.jar
    • calcite-core-1.16.0.jar
    • hadoop-mapreduce-client-core-3.1.1.jar
    • hive-beeline-3.1.0.jar
    • hive-cli-3.1.0.jar
    • hive-common-3.1.0.jar
    • hive-exec-3.1.0.jar
    • hive-hcatalog-core-3.1.0.jar
    • hive-hcatalog-***-adapter-3.1.0.jar
    • hive-hcatalog-server-extensions-3.1.0.jar
    • hive-hcatalog-streaming-3.1.0.jar
    • hive-metastore-3.1.0.jar
    • hive-service-3.1.0.jar
    • libfb303-0.9.3.jar
    • hadoop-plugins-1.0.jar

    相关jar包可从Hive安装目录中获取,重启对应的Flume进程,保证jar包加载到运行环境中。

  2. 配置Hive配置项。

    在FusionInsight Manager界面,选择“集群 > 服务 > Hive > 配置 > 全部配置 > HiveServer > 自定义 > hive.server.customized.configs”

    配置项如下:

    名称

    hive.support.concurrency

    true

    hive.exec.dynamic.partition.mode

    nonstrict

    hive.txn.manager

    org.apache.hadoop.hive.ql.lockmgr.DbTxnManager

    hive.compactor.initiator.on

    true

    hive.compactor.worker.threads

    1

  3. 准备具备supergroup和Hive权限的系统用户flume_hive,安装客户端并创建所需的Hive表。

    示例如下:
    1. 正确安装集群客户端,例如安装目录为“/opt/client”。
    2. 执行以下命令完成用户认证。

      cd /opt/client

      source bigdata_env

      kinit flume_hive

    3. 执行beeline命令,然后执行以下建表语句。
      create table flume_multi_type_part(id string, msg string)
      partitioned by (country string, year_month string, day string)
      clustered by (id) into 5 buckets
      stored as orc TBLPROPERTIES('transactional'='true');
    4. 执行select * from 表名命令;,查询表中数据。

      此时表中数据量为0行。

  4. 准备相关配置文件,假设下载的客户端安装包在“/opt/FusionInsight_Cluster_1_Services_ClientConfig”。

    1. 从“${客户端解压目录}/Hive/config”目录获取以下文件:
      • hivemetastore-site.xml
      • hive-site.xml
    2. 从“${客户端解压目录}/HDFS/config”目录下获取以下文件:

      core-site.xml

    3. 在Flume实例启动的机器上创建目录,将准备好的上述文件放置在创建的目录下。

      例如:“/opt/hivesink-conf/hive-site.xml”

    4. “hivemetastore-site.xml”文件中的所有property配置,拷贝至“hive-site.xml”,并保证处于原有配置之前。

      因为hive内部加载有顺序。

      保证配置文件所在的目录对于Flume运行用户omm有读写权限。

  5. 结果观察。

    在Hive的客户端执行,select * from 表名;查看对应的数据是否已经写入到Hive表中。

参考实例

Flume配置参考示例(SpoolDir--Mem--Hive):
server.sources = spool_source
server.channels = mem_channel
server.sinks = Hive_Sink

#config the source
server.sources.spool_source.type = spooldir
server.sources.spool_source.spoolDir = /tmp/testflume
server.sources.spool_source.montime =
server.sources.spool_source.fileSuffix =.COMPLETED
server.sources.spool_source.deletePolicy = never
server.sources.spool_source.trackerDir =.flumespool
server.sources.spool_source.ignorePattern = ^$
server.sources.spool_source.batchSize = 20
server.sources.spool_source.inputCharset =UTF-8
server.sources.spool_source.selector.type = replicating
server.sources.spool_source.fileHeader = false
server.sources.spool_source.fileHeaderKey = file
server.sources.spool_source.basenameHeaderKey= basename
server.sources.spool_source.deserializer = LINE
server.sources.spool_source.deserializer.maxBatchLine= 1
server.sources.spool_source.deserializer.maxLineLength= 2048
server.sources.spool_source.channels = mem_channel

#config the channel
server.channels.mem_channel.type = memory
server.channels.mem_channel.capacity =10000
server.channels.mem_channel.transactionCapacity= 2000
server.channels.mem_channel.channelfullcount= 10
server.channels.mem_channel.keep-alive = 3
server.channels.mem_channel.byteCapacity =
server.channels.mem_channel.byteCapacityBufferPercentage= 20

#config the sink
server.sinks.Hive_Sink.type = hive
server.sinks.Hive_Sink.channel = mem_channel
server.sinks.Hive_Sink.hive.metastore = thrift://${任意metastore业务IP}:21088
server.sinks.Hive_Sink.hive.hiveSite = /opt/hivesink-conf/hive-site.xml
server.sinks.Hive_Sink.hive.coreSite = /opt/hivesink-conf/core-site.xml
server.sinks.Hive_Sink.hive.metastoreSite = /opt/hivesink-conf/hivemeatastore-site.xml
server.sinks.Hive_Sink.hive.database = default
server.sinks.Hive_Sink.hive.table = flume_multi_type_part
server.sinks.Hive_Sink.hive.partition = Tag,%Y-%m,%d
server.sinks.Hive_Sink.hive.txnsPerBatchAsk= 100
server.sinks.Hive_Sink.hive.autoCreatePartitions= true
server.sinks.Hive_Sink.useLocalTimeStamp = true
server.sinks.Hive_Sink.batchSize = 1000
server.sinks.Hive_Sink.hive.kerberosPrincipal= super1
server.sinks.Hive_Sink.hive.kerberosKeytab= /opt/mykeytab/user.keytab
server.sinks.Hive_Sink.round = true
server.sinks.Hive_Sink.roundValue = 10
server.sinks.Hive_Sink.roundUnit = minute
server.sinks.Hive_Sink.serializer = DELIMITED
server.sinks.Hive_Sink.serializer.delimiter= ";"
server.sinks.Hive_Sink.serializer.serdeSeparator= ';'
server.sinks.Hive_Sink.serializer.fieldnames= id,msg