更新时间:2024-11-29 GMT+08:00
Flume对接安全Hive指导
操作场景
使用Flume对接集群中的Hive(3.1.0版本)。
前置条件
集群正确安装了Flume服务和Hive服务,且服务正常无告警异常。
操作步骤
- 使用omm用户将如下jar包导入到需要测试的Flume实例的lib目录下(客户端/服务端),列表如下:
- antlr-版本号.jar
- antlr-runtime-版本号.jar
- calcite-core-版本号.jar
- hadoop-mapreduce-client-core-版本号.jar
- hive-beeline-版本号.jar
- hive-cli-版本号.jar
- hive-common-版本号.jar
- hive-exec-版本号.jar
- hive-hcatalog-core-版本号.jar
- hive-hcatalog-pig-adapter-版本号.jar
- hive-hcatalog-server-extensions-版本号.jar
- hive-hcatalog-streaming-版本号.jar
- hive-metastore-版本号.jar
- hive-service-版本号.jar
- libfb303-版本号.jar
- hadoop-plugins-版本号.jar
相关jar包可从Hive安装目录中获取,重启对应的Flume进程,保证jar包加载到运行环境中。
- 配置Hive配置项。
在FusionInsight Manager界面,选择“集群 > 服务 > Hive > 配置 > 全部配置 > HiveServer(角色) > 自定义 > hive.server.customized.configs”,添加如下参数:
表1 hive.server.customized.configs参数值配置 名称
值
hive.support.concurrency
true
hive.exec.dynamic.partition.mode
nonstrict
hive.txn.manager
org.apache.hadoop.hive.ql.lockmgr.DbTxnManager
hive.compactor.initiator.on
true
hive.compactor.worker.threads
1
- 准备具备supergroup和Hive权限的系统用户flume_hive,安装客户端并创建所需的Hive表。
示例如下:
- 正确安装集群客户端,例如安装目录为“/opt/client”。
- 执行以下命令完成用户认证。
source bigdata_env
kinit flume_hive
- 执行beeline命令,然后执行以下建表语句。
create table flume_multi_type_part(id string, msg string) partitioned by (country string, year_month string, day string) clustered by (id) into 5 buckets stored as orc TBLPROPERTIES('transactional'='true');
- 执行select * from 表名命令;,查询表中数据。
此时表中数据量为0行。
- 准备相关配置文件,假设下载的客户端安装包在“/opt/FusionInsight_Cluster_1_Services_ClientConfig”。
- 从“${客户端解压目录}/Hive/config”目录获取以下文件:
- hivemetastore-site.xml
- hive-site.xml
- 从“${客户端解压目录}/HDFS/config”目录下获取以下文件:
core-site.xml
- 在Flume实例启动的机器上创建目录,将准备好的上述文件放置在创建的目录下。
- 将“hivemetastore-site.xml”文件中的所有property配置,拷贝至“hive-site.xml”,并保证处于原有配置之前。
因为hive内部加载有顺序。
保证配置文件所在的目录对于Flume运行用户omm有读写权限。
- 从“${客户端解压目录}/Hive/config”目录获取以下文件:
- 结果观察。
在Hive的客户端执行,select * from 表名;查看对应的数据是否已经写入到Hive表中。
参考实例
Flume配置参考示例(SpoolDir--Mem--Hive):
server.sources = spool_source server.channels = mem_channel server.sinks = Hive_Sink #config the source server.sources.spool_source.type = spooldir server.sources.spool_source.spoolDir = /tmp/testflume server.sources.spool_source.montime = server.sources.spool_source.fileSuffix =.COMPLETED server.sources.spool_source.deletePolicy = never server.sources.spool_source.trackerDir =.flumespool server.sources.spool_source.ignorePattern = ^$ server.sources.spool_source.batchSize = 20 server.sources.spool_source.inputCharset =UTF-8 server.sources.spool_source.selector.type = replicating server.sources.spool_source.fileHeader = false server.sources.spool_source.fileHeaderKey = file server.sources.spool_source.basenameHeaderKey= basename server.sources.spool_source.deserializer = LINE server.sources.spool_source.deserializer.maxBatchLine= 1 server.sources.spool_source.deserializer.maxLineLength= 2048 server.sources.spool_source.channels = mem_channel #config the channel server.channels.mem_channel.type = memory server.channels.mem_channel.capacity =10000 server.channels.mem_channel.transactionCapacity= 2000 server.channels.mem_channel.channelfullcount= 10 server.channels.mem_channel.keep-alive = 3 server.channels.mem_channel.byteCapacity = server.channels.mem_channel.byteCapacityBufferPercentage= 20 #config the sink server.sinks.Hive_Sink.type = hive server.sinks.Hive_Sink.channel = mem_channel server.sinks.Hive_Sink.hive.metastore = thrift://${任意metastore业务IP}:21088 server.sinks.Hive_Sink.hive.hiveSite = /opt/hivesink-conf/hive-site.xml server.sinks.Hive_Sink.hive.coreSite = /opt/hivesink-conf/core-site.xml server.sinks.Hive_Sink.hive.metastoreSite = /opt/hivesink-conf/hivemeatastore-site.xml server.sinks.Hive_Sink.hive.database = default server.sinks.Hive_Sink.hive.table = flume_multi_type_part server.sinks.Hive_Sink.hive.partition = Tag,%Y-%m,%d server.sinks.Hive_Sink.hive.txnsPerBatchAsk= 100 server.sinks.Hive_Sink.hive.autoCreatePartitions= true server.sinks.Hive_Sink.useLocalTimeStamp = true server.sinks.Hive_Sink.batchSize = 1000 server.sinks.Hive_Sink.hive.kerberosPrincipal= super1 server.sinks.Hive_Sink.hive.kerberosKeytab= /opt/mykeytab/user.keytab server.sinks.Hive_Sink.round = true server.sinks.Hive_Sink.roundValue = 10 server.sinks.Hive_Sink.roundUnit = minute server.sinks.Hive_Sink.serializer = DELIMITED server.sinks.Hive_Sink.serializer.delimiter= ";" server.sinks.Hive_Sink.serializer.serdeSeparator= ';' server.sinks.Hive_Sink.serializer.fieldnames= id,msg
父主题: 使用Flume