运行Flink作业
Flink作业是基于Flink框架开发的分布式数据处理任务,主要用于流式数据处理和有状态计算。Flink是一个面向流处理和批处理的统一计算框架,其作业以流为核心(批处理可视为有限流),支持高吞吐、低延迟、精准语义的实时数据处理,广泛应用于实时监控、日志分析、金融交易等场景。
用户可将自己开发的程序提交到MRS中,执行程序并获取结果,本章节指导您如何在MRS集群中提交一个Flink作业。
用户可以在MRS管理控制台在线创建一个作业并提交运行,也可以通过MRS集群客户端来以命令行形式提交作业。
前提条件
- 用户已经将作业所需的程序包和数据文件上传至OBS或HDFS文件系统中。
- 如果作业程序需要读取以及分析OBS文件系统中的数据,需要先配置MRS集群的存算分离,请参考配置MRS集群存算分离。
约束与限制
- 当IAM用户的用户组的所属策略从MRS ReadOnlyAccess向MRS CommonOperations、MRS FullAccess、MRS Administrator变化时,或者反之从MRS CommonOperations、MRS FullAccess、MRS Administrator向MRS ReadOnlyAccess变化时,由于集群节点的SSSD(System Security Services Daemon)缓存刷新需要时间,因此用户同步完成后,请等待5分钟,待新策略生效之后,再到MRS管理控制台在线提交作业,否则会出现提交作业失败的情况。
- 当前IAM用户名中存在空格时(例如admin 01),不支持通过MRS管理控制台添加作业。
操作视频
本视频以在未开启Kerberos认证的MRS集群的管理控制台上,提交用于处理OBS存储的数据的Flink作业为例进行说明。
因不同版本操作界面可能存在差异,相关视频供参考,具体以实际环境为准。
提交作业
用户可通过管理控制台在线创建并运行作业,也可以通过集群客户端命令方式手动提交。
- 准备应用程序及数据。
在本章节中,以提交一个Flink单词统计应用为例进行介绍,该样例程序可以从MRS集群客户端中获取(“客户端安装目录/Flink/flink/examples/batch/WordCount.jar”),然后上传至HDFS或者OBS的指定目录中,请参考上传应用数据至MRS集群。
该应用运行时,需要的输入参数如下:
- input:待分析的数据文件,需提前上传至HDFS或者OBS文件系统中。
This is a test demo for MRS Flink. Flink is a unified computing framework that supports both batch processing and stream processing. It provides a stream data processing engine that supports data distribution and parallel computing.
- output:进行单词统计后,结果输出文件。
- input:待分析的数据文件,需提前上传至HDFS或者OBS文件系统中。
- 登录MRS管理控制台。
- 选择“现有集群”,选中一个运行中的集群并单击集群名称,进入集群概览信息页面。
- 在“概览”页签中,单击“IAM用户同步”右侧的“同步”进行IAM用户同步。
集群开启Kerberos认证时需执行该步骤,若集群未开启Kerberos认证,无需执行本步骤。
IAM用户同步完成后,请等待5分钟,再进行提交作业,更多IAM用户同步说明请参考IAM用户同步MRS集群说明。
- 单击“作业管理”,在作业列表界面单击“添加”。
- “作业类型”选择“Flink”并参考表1配置Flink作业信息。
图1 添加Flink作业
表1 作业配置信息 参数
描述
示例
作业名称
作业名称,只能由字母、数字、中划线和下划线组成,并且长度为1~64个字符。
flink_job
执行程序路径
待执行程序包地址,可直接手动输入地址路径,也可单击“HDFS”或者“OBS”后选择文件。
- 最多为1023字符,不能包含;|&>,<'$特殊字符,且不可为空或全空格。
- OBS程序路径地址以“obs://”开头,HDFS程序路径地址以“hdfs://hacluster”开头,例如“hdfs://hacluster/user/XXX.jar”。
- Flink作业执行程序需要以“.jar”结尾。
obs://mrs-demotest/program/WordCount.jar
运行程序参数
可选参数,为本次执行的作业配置相关优化参数(例如线程、内存、CPU核数等),用于优化资源使用效率,提升作业的执行性能。
Flink作业常用运行程序参数如表2所示,可根据执行程序及集群资源情况进行配置,若不配置将使用集群默认值。
-
执行程序参数
可选参数,程序执行的关键参数,该参数由用户程序内的函数指定,MRS只负责参数的传入。
多个参数间使用空格隔开,最多为150000字符,不能包含;|&><'$特殊字符,可为空。
注意:用户输入带有敏感信息(如登录密码)的参数时,可通过在参数名前添加“@”的方式为该参数值加密,以防止敏感信息被明文形式持久化。
在MRS管理控制台查看作业信息时,敏感信息会显示为“*”。
例如:username=testuser @password=用户密码
--input obs://mrs-demotest/input/data2.txt --output obs://mrs-demotest/output.txt
服务配置参数
可选参数,用于为本次执行的作业修改服务配置参数。
该参数的修改仅适用于本次执行的作业,如需对集群永久生效,请参考修改MRS集群组件配置参数进行集群组件配置参数的修改。
例如在MRS集群未配置存算分离的场景下,作业需要通过AK/SK方式访问OBS,可增加以下服务配置参数:
- fs.obs.access.key:访问OBS的密钥ID。
- fs.obs.secret.key:访问OBS与密钥ID对应的密钥。
-
命令参考
用于展示提交作业时提交到后台执行的命令。
N/A
- 确认作业配置信息,单击“确定”,完成作业的新增。
- 作业提交成功后,可在作业列表中查看作业运行状态及执行结果,等待作业状态变为“已完成”,可查看相关程序分析结果。
在本示例程序中,单击“查看日志”,可查看Flink作业的详细执行过程。
作业执行完成后,在指定的结果输出目录下,可查看具体的统计结果。
例如“obs://mrs-demotest/output.txt”文件内容如下:
a 3 and 2 batch 1 both 1 computing 2 data 2 demo 1 distribution 1 engine 1 flink 2 for 1 framework 1 is 2 it 1 mrs 1 parallel 1 processing 3 provides 1 stream 2 supports 2 test 1 that 2 this 1 unified 1
- 准备应用程序及数据。
在本章节中,以提交一个Flink单词统计应用为例进行介绍,该样例程序可以从MRS集群客户端中获取(“客户端安装目录/Flink/flink/examples/batch/WordCount.jar”),然后上传至HDFS或者OBS的指定目录中,请参考上传应用数据至MRS集群。
该应用运行时,需要的输入参数如下:
- input:待分析的数据文件,需提前上传至HDFS或者OBS文件系统中。
This is a test demo for MRS Flink. Flink is a unified computing framework that supports both batch processing and stream processing. It provides a stream data processing engine that supports data distribution and parallel computing.
- output:进行单词统计后,结果输出文件。
- input:待分析的数据文件,需提前上传至HDFS或者OBS文件系统中。
- 如果当前集群已开启Kerberos认证,需提前在Manager界面中创建一个具有对应作业提交权限的业务用户,请参考创建MRS集群用户。
- 本示例中,创建一个人机用户testuser,关联用户组“supergroup”及角色“System_administrator”。
- 在集群的Manager界面,选择“系统 > 权限 > 用户”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”。
- 将下载的认证凭据压缩包解压缩,并将得到的文件复制到客户端节点中,例如客户端节点的“/opt/flinkclient/Flink/flink/conf”目录下。如果是在集群外节点安装的客户端,需要将得到的文件复制到该节点的“/etc/”目录下。
- 安装MRS集群客户端。
具体操作可参考安装MRS集群客户端,例如安装目录为“/opt/flinkclient”。
- 使用MRS集群客户端安装用户登录客户端所在的节点。
具体操作可参考登录MRS集群节点。
- 执行以下命令进入客户端安装目录。
cd /opt/flinkclient
加载环境变量:
source bigdata_env
- 如果当前集群已开启Kerberos认证,修改Flink相关安全配置参数。
- MRS 3.x及之后版本,已开启Kerberos认证的集群中,需要将客户端安装节点和集群Manager的浮动IP地址追加到“flink-conf.yaml”文件中的“jobmanager.web.allow-access-address”配置项中,IP地址之间使用英文逗号分隔。
vi Flink/flink/conf/flink-conf.yaml
例如修改如下:
jobmanager.web.allow-access-address: 192.168.0.219,192.168.0.153,192.168.0.47,192.168.0.127
集群Manager的浮动IP地址可在集群的OMS节点中执行ifconfig命令查看“ethX:wsom”对应的IP地址。
- 继续在“flink-conf.yaml”配置文件搜索并配置keytab路径以及用户名参数。
security.kerberos.login.keytab: /opt/flinkclient/Flink/flink/conf/user.keytab security.kerberos.login.principal: testuser
- 执行如下命令进行Flink安全加固。
cd Flink/flink/bin
执行以下命令,根据界面提示设置一个自定义密码。该脚本会自动替换“客户端目录/Flink/flink/conf/flink-conf.yaml”中关于SSL的相关配置值,针对MRS 3.x之前版本,安全集群默认没有开启外部SSL,用户如果需要启用外部SSL,进行配置后再次运行该脚本即可,配置参数如表3所示。
sh generate_keystore.sh
表3 参数描述 参数
描述
参数值示例
security.ssl.rest.enabled
打开外部SSL开关。
true
security.ssl.rest.keystore
keystore的存放路径。
${path}/flink.keystore
security.ssl.rest.keystore-password
keystore的password,“123456”表示需要用户输入自定义设置的密码值。
123456
security.ssl.rest.key-password
ssl key的password,“123456”表示需要用户输入自定义设置的密码值。
123456
security.ssl.rest.truststore
truststore存放路径。
${path}/flink.truststore
security.ssl.rest.truststore-password
truststore的password,“123456”表示需要用户输入自定义设置的密码值。
123456
- 配置flink.keystore和flink.truststore文件路径为相对路径。
- 在“Flink/flink/conf/”目录下新建目录,例如ssl。
mkdir /opt/flinkclient/Flink/flink/conf/ssl
- 移动flink.keystore和flink.truststore文件到“/opt/Bigdata/client/Flink/flink/conf/ssl/”中。
mv flink.keystore /opt/flinkclient/Flink/flink/conf/ssl/
mv flink.truststore /opt/flinkclient/Flink/flink/conf/ssl/
- 针对MRS 3.x及之后版本,修改flink-conf.yaml文件中如下两个参数为相对路径。
security.ssl.keystore: ssl/flink.keystore security.ssl.truststore: ssl/flink.truststore
- 针对MRS 3.x之前版本,修改flink-conf.yaml文件中如下两个参数为相对路径。
security.ssl.internal.keystore: ssl/flink.keystore security.ssl.internal.truststore: ssl/flink.truststore
- 在“Flink/flink/conf/”目录下新建目录,例如ssl。
- 如果集群客户端安装在集群外节点,在配置文件“flink-conf.yaml”中追加如下配置值,配置客户端所在节点的IP地址。
web.access-control-allow-origin: 192.168.0.150 jobmanager.web.allow-access-address: 192.168.0.150
- MRS 3.x及之后版本,已开启Kerberos认证的集群中,需要将客户端安装节点和集群Manager的浮动IP地址追加到“flink-conf.yaml”文件中的“jobmanager.web.allow-access-address”配置项中,IP地址之间使用英文逗号分隔。
- 运行Flink作业。
本章节以提交客户端自带的WordCount样例程序为例。
- 安全集群(开启Kerberos认证)
- 在“ssl”的同级目录下执行如下命令启动session,并在session中提交作业。
例如“ssl”所在目录是“/opt/flinkclient/Flink/flink/conf/”,则在“/opt/flinkclient/Flink/flink/conf/”目录下执行命令:
yarn-session.sh -t ssl/ -nm "session-name"
重新打开一个客户端连接窗口,执行以下命令初始化环境变量:
source /opt/flinkclient/bigdata_env
执行以下命令提交作业:
flink run /opt/flinkclient/Flink/flink/examples/batch/WordCount.jar --input obs://mrs-demotest/input/data2.txt --output obs://mrs-demotest/output.txt
- 执行如下命令在Yarn上提交单个作业。
flink run -m yarn-cluster /opt/flinkclient/Flink/flink/examples/batch/WordCount.jar --input obs://mrs-demotest/input/data2.txt --output obs://mrs-demotest/output.txt
- 在“ssl”的同级目录下执行如下命令启动session,并在session中提交作业。
- 普通集群(未开启Kerberos认证)
- 在“/opt/flinkclient/Flink/flink/conf/”目录下执行如下命令启动session,并在session中提交作业。
yarn-session.sh -nm "session-name" -d
重新打开一个客户端连接窗口,执行以下命令提交作业:
flink run /opt/flinkclient/Flink/flink/examples/batch/WordCount.jar --input obs://mrs-demotest/input/data2.txt --output obs://mrs-demotest/output.txt
- 执行如下命令在Yarn上提交单个作业。
flink run -m yarn-cluster /opt/flinkclient/Flink/flink/examples/batch/WordCount.jar --input obs://mrs-demotest/input/data2.txt --output obs://mrs-demotest/output.txt
- 在“/opt/flinkclient/Flink/flink/conf/”目录下执行如下命令启动session,并在session中提交作业。
- 安全集群(开启Kerberos认证)
- 使用testuser用户登录集群Manager页面,选择“集群 > 服务 > Yarn”,单击“ResourceManager Web UI”右侧的超链接进入Yarn WebUI页面,单击对应作业的Application ID,即可查看作业运行信息及相关日志。
图2 查看Flink作业详情
- 作业执行完成后,在指定的结果输出目录下,可查看具体的统计结果。
例如“obs://mrs-demotest/output.txt”文件内容如下:
a 3 and 2 batch 1 both 1 computing 2 data 2 demo 1 distribution 1 engine 1 flink 2 for 1 framework 1 is 2 it 1 mrs 1 parallel 1 processing 3 provides 1 stream 2 supports 2 test 1 that 2 this 1 unified 1
相关文档
- 通过管理控制台提交作业后,每一条作业都支持查看日志,具体操作请参见查看MRS作业详情和日志。
- 开启Kerberos认证的集群在提交作业时,未进行IAM用户同步报错处理方法请参见提交作业时系统提示当前用户在Manager不存在如何处理?。
- 作业提交后,查看指定Yarn任务日志的具体操作请参见如何查看指定Yarn任务的日志?。