FlinkServer对接HDFS
操作场景
本章节介绍HDFS作为sink表的DDL定义,以及创建sink表时使用的WITH参数和代码示例,并指导如何在FlinkServer作业管理页面操作。
本示例以安全模式Kafka为例。
前提条件
- 集群中已安装HDFS、Yarn、Flink服务。
- 包含HDFS服务的客户端已安装,安装路径如:/opt/client。
- 参考创建FlinkServer角色创建一个具有FlinkServer管理员权限的用户用于访问Flink WebUI,如:flink_admin。
操作步骤
- 使用flink_admin登录Manager,选择“集群 > 服务 > Flink”,在“Flink WebUI”右侧,单击链接,访问Flink的WebUI。
- 参考新建作业,新建Flink SQL流作业,参考如下内容在作业开发界面进行作业开发,配置完成后启动作业。
需勾选“基础参数”中的“开启CheckPoint”,“时间间隔(ms)”可设置为“60000”,“模式”可使用默认值。
CREATE TABLE kafka_table ( user_id STRING, order_amount DOUBLE, log_ts TIMESTAMP(3), WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND ) WITH ( 'connector' = 'kafka', 'topic' = 'user_source', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup', 'scan.startup.mode' = 'latest-offset', 'format' = 'csv', --跳过解析失败的csv数据 'csv.ignore-parse-errors' = 'true',--如果是json数据格式,设置'json.ignore-parse-errors' = 'true' 'properties.sasl.kerberos.service.name' = 'kafka', 'properties.security.protocol' = 'SASL_PLAINTEXT', 'properties.kerberos.domain.name' = 'hadoop.系统域名' ); CREATE TABLE fs_table ( user_id STRING, order_amount DOUBLE, dt STRING, `hour` STRING ) PARTITIONED BY (dt, `hour`) WITH ( --根据日期进行文件分区 'connector'='filesystem', 'path'='hdfs://hacluster/tmp/parquet', 'format'='parquet', 'sink.partition-commit.delay'='0 s',--该延迟时间之前分区不会被提交。如果是按天分区,可以设置为'1 d',如果是按小时分区,应设置为'1 h' 'sink.partition-commit.policy.kind'='success-file' ); -- streaming sql, insert into file system table INSERT INTO fs_table SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd'), DATE_FORMAT(log_ts, 'HH') FROM kafka_table;
Kafka端口号:
- 集群已启用Kerberos认证(安全模式)时Broker端口为“sasl.port”参数的值,默认为“21007”。
- 集群未启用Kerberos认证(普通模式)时Broker端口为“port”的值,默认为“9092”。如果配置端口号为9092,则需要配置“allow.everyone.if.no.acl.found”参数为true,具体操作如下:
登录FusionInsight Manager系统,选择“集群 > 服务 > Kafka > 配置 > 全部配置”,搜索“allow.everyone.if.no.acl.found”配置,修改参数值为true,保存配置即可。
- 系统域名:可登录FusionInsight Manager,选择“系统 > 权限 > 域和互信”,查看“本端域”参数,即为当前系统域名。
- 查看作业管理界面,作业状态为“运行中”。
- 参考管理Kafka主题中的消息,查看Topic并向Kafka中写入数据。
./kafka-topics.sh --list --zookeeper ZooKeeper的quorumpeer实例业务IP:ZooKeeper客户端端口号/kafka
sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic 主题名称 --producer.config 客户端目录/Kafka/kafka/config/producer.properties
例如本示例使用主题名称为user_source:sh kafka-console-producer.sh --broker-list Kafka角色实例所在节点的IP地址:Kafka端口号 --topic user_source --producer.config /opt/client/Kafka/kafka/config/producer.properties
输入消息内容:3,3333,"2021-09-10 14:00" 4,4444,"2021-09-10 14:01"
输入完成后按回车发送消息。
- 执行以下命令查看Sink表中是否接收到数据,即HDFS目录是否正常写入文件。
hdfs dfs -ls -R /sql/parquet
Flink对接HDFS分区
- Flink对接HDFS支持自定义分区。
Flink文件系统分区支持使用标准的Hive格式。不需要将分区预先注册到表目录中,分区是根据目录结构推断。
例如,根据下面的目录分区的表将被推断为包含日期时间和小时分区。path └── datetime=2021-09-03 └── hour=11 ├── part-0.parquet ├── part-1.parquet └── hour=12 ├── part-0.parquet └── datetime=2021-09-24 └── hour=6 ├── part-0.parquet
- 分区文件的滚动策略。
分区目录中的数据被拆分为part文件,每个分区将至少包含一个part文件,用于接收sink的子任务的数据写入。
如下参数介绍分区文件如何进行滚动。配置项
默认值
类型
描述
sink.rolling-policy.file-size
128MB
MemorySize
分区文件达到该阈值后,进行滚动。
sink.rolling-policy.rollover-interval
30min
Duration
分区文件在滚动前可以保持打开的最长持续时间。
sink.rolling-policy.check-interval
1min
Duration
检查基于时间的滚动策略的时间间隔。
- 分区目录的文件合并。
- 分区文件的提交。
文件写入分区后,通常需要通知下游应用程序。如将分区添加到Hive元存储中,或在目录中写入_SUCCESS文件。分区文件的提交操作基于触发器和策略的组合方式。
- 分区文件提交触发器相关配置
配置项
默认值
类型
描述
sink.partition-commit.trigger
process-time
String
- process-time:基于计算节点的系统时间,它既不需要分区时间提取,也不需要生成watermark。即“当前系统时间”超过“分区创建时的系统时间”加上“延迟”时间,就提交分区。
- partition-time:基于从分区提取的时间,它需要生成watermark。即“watermark时间”超过“从分区提取的时间”加上“延迟”时间,就提交分区。
sink.partition-commit.delay
0 s
Duration
分区在延迟时间之前不会提交。如果是每日分区,则应为“1 d”,如果是每小时分区,则应为“1 h”。
- 分区问文件提交策略相关配置
配置项
默认值
类型
描述
sink.partition-commit.policy.kind
-
String
提交分区的策略。
- metastore:将分区添加到元存储。只有hive表支持元存储策略,文件系统通过目录结构管理分区。
- success-file:将success-file文件添加到目录中。
- 两者可以同时配置,即:'sink.partition-commit.policy.kind'='metastore,success-file'。
sink.partition-commit.policy.class
-
String
用于实现分区提交策略接口的分区提交策略类。
仅在自定义提交策略中生效。
sink.partition-commit.success-file.name
_SUCCESS
String
success-file分区提交策略的文件名,默认值为_SUCCESS。
- 分区文件提交触发器相关配置