从零开始使用Flink
操作场景
本章节提供一个使用Flink运行wordcount作业的操作入门指导。
前提条件
- MRS集群中已安装Flink组件且集群内各组件正常运行。
- 已安装集群客户端,例如安装目录为“/opt/hadoopclient”。
操作步骤
- 以客户端安装用户,登录安装客户端的节点。
- 执行以下命令,切换到客户端安装目录。
cd /opt/hadoopclient
- 执行如下命令初始化环境变量。
source /opt/hadoopclient/bigdata_env
- 若集群开启了Kerberos认证,需要执行以下步骤,若集群未开启Kerberos认证请跳过该步骤。
- 准备一个用于提交Flink作业的用户,例如test。
人机用户创建后,使用新用户登录FusionInsight Manager并根据界面提示修改初始密码。
用户在Flink提交作业或者运行作业时,应具有如下权限:
- 如果启用Ranger鉴权,当前用户必须属于hadoop组或者已在Ranger中为该用户添加“/flink”目录的读写权限。
- 如果停用Ranger鉴权,当前用户必须属于hadoop组。
- 登录FusionInsight Manager界面,选择“系统 > 权限 > 用户”,在已增加用户所在行的“操作”列,选择“更多 > 下载认证凭据”,下载用户对应的认证凭据文件到本地并解压。
- 将解压得到的“user.keytab”、“krb5.conf”文件拷贝到客户端节点的“/opt/hadoopclient/Flink/flink/conf”目录下。
- 登录客户端节点,将客户端节点的业务IP地址以及Manager的浮动IP地址追加到“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”文件中的“jobmanager.web.allow-access-address”配置项中,IP地址之间使用英文逗号分隔。
vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml
- 配置安全认证。
在“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”配置文件中的对应配置添加keytab路径以及用户名。
security.kerberos.login.keytab: <user.keytab文件路径> security.kerberos.login.principal: <用户名>
例如:
security.kerberos.login.keytab: /opt/hadoopclient/Flink/flink/conf/user.keytab security.kerberos.login.principal: test
- 参考认证和加密进行安全加固配置,执行如下命令,并设置为一个用于提交作业的密码。
cd /opt/hadoopclient/Flink/flink/bin
sh generate_keystore.sh
该脚本会自动替换“/opt/hadoopclient/Flink/flink/conf/flink-conf.yaml”中关于SSL的相关配置参数值。
- 配置客户端访问“flink.keystore”和“flink.truststore”文件的路径配置。
- 绝对路径
执行“generate_keystore.sh”脚本后,默认在“flink-conf.yaml”文件中将“flink.keystore”和“flink.truststore”文件路径自动配置为绝对路径,此时需要将“conf”目录中的“flink.keystore”和“flink.truststore”文件分别放置在Flink客户端以及Yarn各个节点的该绝对路径上。
- 相对路径(推荐)
请执行如下步骤配置“flink.keystore”和“flink.truststore”文件路径为相对路径,并确保Flink客户端执行命令的目录可以直接访问该相对路径。
- 在“/opt/hadoopclient/Flink/flink/conf/”目录下新建目录,例如“ssl”。
cd /opt/hadoopclient/Flink/flink/conf/
mkdir ssl
- 移动“flink.keystore”和“flink.truststore”文件到新建目录中。
mv flink.keystore ssl/
mv flink.truststore ssl/
- 修改“flink-conf.yaml”文件中如下两个参数为相对路径。
vi /opt/hadoopclient/Flink/flink/conf/flink-conf.yaml
security.ssl.keystore: ssl/flink.keystore security.ssl.truststore: ssl/flink.truststore
- 在“/opt/hadoopclient/Flink/flink/conf/”目录下新建目录,例如“ssl”。
- 绝对路径
- 准备一个用于提交Flink作业的用户,例如test。
- 运行wordcount作业。
- 普通集群(未开启Kerberos认证)
- 安全集群(开启Kerberos认证)
- “flink.keystore”和“flink.truststore”文件路径为相对路径时:
- 在“ssl”的同级目录下执行如下命令启动session,并在session中提交作业,其中“ssl/”是相对路径。
cd /opt/hadoopclient/Flink/flink/conf/
yarn-session.sh -t ssl/ -nm "session-name"
... Cluster started: Yarn cluster with application id application_1624937999496_0017 JobManager Web Interface: http://192.168.1.150:32261
重新打开一个客户端连接,提交作业:
source /opt/hadoopclient/bigdata_env
flink run /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
... Job has been submitted with JobID 587d5498fff18d8b2501fdf7ebb9c4fb Program execution finished Job with JobID 587d5498fff18d8b2501fdf7ebb9c4fb has finished. Job Runtime: 19917 ms
- 执行如下命令在Yarn上提交单个作业。
cd /opt/hadoopclient/Flink/flink/conf/
flink run -m yarn-cluster -yt ssl/ /opt/hadoopclient/Flink/flink/examples/streaming/WordCount.jar
... Cluster started: Yarn cluster with application id application_1624937999496_0016 Job has been submitted with JobID e9c59fb48f44feae7b62dd90336d6d7f Program execution finished Job with JobID e9c59fb48f44feae7b62dd90336d6d7f has finished. Job Runtime: 18155 ms
- 在“ssl”的同级目录下执行如下命令启动session,并在session中提交作业,其中“ssl/”是相对路径。
- “flink.keystore”和“flink.truststore”文件路径为绝对路径时:
- “flink.keystore”和“flink.truststore”文件路径为相对路径时:
- 使用运行用户登录FusionInsight Manager,进入Yarn服务的原生页面,找到对应作业的application,单击application名称,进入到作业详情页面。
- 若作业尚未结束,可单击“Tracking URL”链接进入到Flink的原生页面,查看作业的运行信息。
- 若作业已运行结束,对于在session中提交的作业,可以单击“Tracking URL”链接登录Flink原生页面查看作业信息。
图1 application