从零开始使用Flink
操作场景
本章节提供一个使用Flink运行wordcount作业的操作入门指导。
前提条件
- MRS集群中已安装Flink组件且集群内各组件正常运行。
- 已安装集群客户端,例如安装目录为“/opt/hadoopclient”。
操作步骤
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录。
cd /opt/hadoopclient
- 执行如下命令初始化环境变量。
source /opt/hadoopclient/bigdata_env
- 若集群开启了Kerberos认证,需要执行以下步骤,若集群未开启Kerberos认证请跳过该步骤。
- 准备一个用于提交Flink作业的用户,例如test。
登录Manager,选择“系统 > 权限 > 角色”,单击“添加角色”,输入角色名称与描述。在“配置资源权限”的表格中选择“待操作集群的名称 > Flink”,勾选“FlinkServer管理操作权限”,单击“确定”,返回角色管理。
选择“系统 > 权限 > 用户”,单击“添加用户”,输入用户名、密码等,用户类型选择“人机”,用户组根据需求添加“hadoop”、“yarnviewgroup”和“hadooppmanager”,并添加“System_administrator”、“default”和创建的角色,单击“确定”完成Flink作业用户创建(首次创建的用户需使用该用户登录Manager修改密码)。
用户在Flink提交作业或者运行作业时,需根据涉及的相关服务(如HDFS、Kafka等)是否启用Ranger鉴权,使该用户应具有如下权限:
- 如果启用Ranger鉴权,当前用户必须属于hadoop组或者已在Ranger中为该用户添加“/flink”目录的读写权限。
- 如果停用Ranger鉴权,当前用户必须属于hadoop组。
- 登录FusionInsight Manager界面,选择“系统 > 权限 > 用户”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”,下载用户对应的认证凭据文件到本地并解压。
- 将解压得到的“user.keytab”、“krb5.conf”文件拷贝到客户端节点的“/opt/hadoopclient/Flink/flink/conf”目录下。
- 登录客户端节点,将客户端节点的业务IP地址以及Manager的浮动IP地址追加到“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”文件中的“jobmanager.web.allow-access-address”配置项中,IP地址之间使用英文逗号分隔。
vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml
- 配置安全认证。
在“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”配置文件中的对应配置添加keytab路径以及用户名。
security.kerberos.login.keytab: <user.keytab文件路径> security.kerberos.login.principal: <用户名>
例如:
security.kerberos.login.keytab: /opt/hadoopclient/Flink/flink/conf/user.keytab security.kerberos.login.principal: test
- 参考认证和加密进行安全加固配置,执行如下命令,并设置为一个用于提交作业的密码。
cd /opt/hadoopclient/Flink/flink/bin
sh generate_keystore.sh
该脚本会自动替换“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”中关于SSL的相关配置参数值。
- 配置客户端访问“flink.keystore”和“flink.truststore”文件的路径配置。
- 绝对路径
执行“generate_keystore.sh”脚本后,默认在“flink-conf.yaml”文件中将“flink.keystore”和“flink.truststore”文件路径自动配置为绝对路径,此时需要将“conf”目录中的“flink.keystore”和“flink.truststore”文件分别放置在Flink客户端以及Yarn各个节点的该绝对路径上。
- 相对路径(推荐)
请执行如下步骤配置“flink.keystore”和“flink.truststore”文件路径为相对路径,并确保Flink客户端执行命令的目录可以直接访问该相对路径。
- 在“/opt/hadoopclient/Flink/flink/conf/”目录下新建目录,例如“ssl”。
cd /opt/hadoopclient/Flink/flink/conf/
mkdir ssl
- 移动“flink.keystore”和“flink.truststore”文件到新建目录中。
mv flink.keystore ssl/
mv flink.truststore ssl/
- 修改“flink-conf.yaml”文件中如下两个参数为相对路径。
vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml
security.ssl.keystore: ssl/flink.keystore security.ssl.truststore: ssl/flink.truststore
- 在“/opt/hadoopclient/Flink/flink/conf/”目录下新建目录,例如“ssl”。
- 绝对路径
- 准备一个用于提交Flink作业的用户,例如test。
- 运行wordcount作业。
作业提交有如下模式:
- Session模式
该模式会在“客户端安装路径/Flink/tmp/.yarn-properties-<username>”中创建一个YARN属性文件,在提交作业时,会将作业提交到该文件所记录的applicationID上。作业结束后Flink集群不会关闭,Session模式下分如下两种模式提交作业:
- Application模式
该模式会在YARN上启动一个Flink集群,其中应用程序jar的main()方法在YARN中的JobManager上执行,可使用HDFS上的依赖包。应用程序完成后,集群将立即关闭。也可以使用yarn application -kill <ApplicationId>或通过取消Flink作业手动停止集群,作业结束后Flink集群也会关闭。
- Per-Job Cluster模式
该模式会在YARN上启动一个Flink集群,然后在本地运行提供的应用程序jar,最后将JobGraph提交给YARN上的JobManager。如果使用--detached参数,客户端将在作业提交后停止,作业结束后Flink集群也会关闭。
- yarn-cluster模式
- 若开启了作业注册功能,即“/opt/client/Flink/flink/conf/flink-conf.yaml”文件中参数配置为“job.alarm.enable: true”、“job.register.enable: true”和“flinkServer.tenant.name: CLIENT_APP”时,运行作业命令中需要指定作业名,即添加参数“-Dyarn.application.name=作业名”。
- 普通集群(未开启Kerberos认证)
- 使用Session的attached模式提交作业
yarn-session.sh -nm "session-name"
重新打开一个客户端连接,提交作业:
source /opt/hadoopclient/bigdata_env
flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 使用Application模式提交作业
flink run-application -t yarn-application /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 使用Per-job模式提交作业
flink run -t yarn-per-job /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 使用yarn-cluster模式提交作业
flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 使用Session的attached模式提交作业
- 安全集群(开启Kerberos认证)
- “flink.keystore”和“flink.truststore”文件路径为相对路径时:
- 使用Session的attached模式提交作业,其中“ssl/”是相对路径
cd /opt/hadoopclient/Flink/flink/conf/
yarn-session.sh -t ssl/ -nm "session-name"
... Cluster started: Yarn cluster with application id application_1624937999496_0017 JobManager Web Interface: http://192.168.1.150:32261
重新打开一个客户端连接,提交作业:
source /opt/hadoopclient/bigdata_env
flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
... Job has been submitted with JobID 587d5498fff18d8b2501fdf7ebb9c4fb Program execution finished Job with JobID 587d5498fff18d8b2501fdf7ebb9c4fb has finished. Job Runtime: 19917 ms
- 使用Application模式提交作业
cd /opt/hadoopclient/Flink/flink/conf/
flink run-application -t yarn-application -Dyarn.ship-files="ssl/" /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
... Submitted application application_1669179911005_0070 Waiting for the cluster to be allocated Deploying cluster, current state ACCEPTED YARN application has been deployed successfully. Found Web Interface xxx:xxx of application 'application_1669179911005_0070'.
- 使用Per-job模式提交作业
cd /opt/hadoopclient/Flink/flink/conf/
flink run -t yarn-per-job -Dyarn.ship-files="ssl/" /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
... Cluster started: Yarn cluster with application id application_1669179911005_0071 Job has been submitted with JobID 75011429a29f230121809f54f4570ed0 Program execution finished Job with JobID 75011429a29f230121809f54f4570ed0 has finished. Job Runtime: 21245 ms
- 使用yarn-cluster模式提交作业
flink run -m yarn-cluster -yt ssl/ /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 使用Session的attached模式提交作业,其中“ssl/”是相对路径
- “flink.keystore”和“flink.truststore”文件路径为绝对路径时:
- 使用Session的attached模式提交作业
cd /opt/hadoopclient/Flink/flink/conf/
yarn-session.sh -nm "session-name"
重新打开一个客户端连接,提交作业:
source /opt/hadoopclient/bigdata_env
flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
或者:flink run -t yarn-session -Dyarn.application.id=application_xxx /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 使用Application模式提交作业
cd /opt/hadoopclient/Flink/flink/conf/
flink run-application -t yarn-application /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 使用Per-job模式提交作业
cd /opt/hadoopclient/Flink/flink/conf/
flink run -t yarn-per-job /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 使用yarn-cluster模式提交作业
flink run -m yarn-cluster /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
- 使用Session的attached模式提交作业
- “flink.keystore”和“flink.truststore”文件路径为相对路径时:
通过客户端注册到FlinkServer的作业,若未开启作业注册到FlinkServer功能,暂不支持在FlinkServer WebUI执行启动、开发、停止等操作。可参考Flink作业级巡检能力开启作业注册到FlinkServer功能。
- Session模式
- 使用运行用户登录FusionInsight Manager,进入Yarn服务的原生页面,找到对应作业的application,单击application名称,进入到作业详情页面。
- 若作业尚未结束,可单击“Tracking URL”链接进入到Flink的原生页面,查看作业的运行信息。
- 若作业已运行结束,对于在session中提交的作业,可以单击“Tracking URL”链接登录Flink原生页面查看作业信息。
图1 application