Flink常见Shell命令
在使用Flink的Shell脚本前,首先需要执行以下操作,详细使用场景可参考运行wordcount作业:
- 安装Flink客户端,例如安装目录为“/opt/client”。
- 初始化环境变量。
source /opt/client/bigdata_env
- 如果当前集群已启用Kerberos认证,需先配置客户端认证,可参考,。如果当前集群未启用Kerberos认证,则无需执行该步骤。
- 参考表1运行相关命令。
表1 Flink Shell命令参考 命令
参数说明
描述
yarn-session.sh
-at,--applicationType <arg>:为Yarn application自定义类型。
-D <property=value>:动态参数配置。
-d,--detached:关闭交互模式,启动一个分离的Flink YARN session。
-h,--help: 显示Yarn session CLI的帮助。
-id,--applicationId <arg>:绑定到一个已经运行的Yarn session。
-j,--jar <arg>:设置用户jar包路径。
-jm,--jobManagerMemory <arg>:为JobManager设置内存。
-m,--jobmanager <arg>:要连接的JobManager的地址,使用该参数可以连接特定的JobManager。
-nl,--nodeLabel <arg>: 指定YARN application的nodeLabel 。
-nm,--name <arg>:为Yarn application自定义名称。
-q,--query:查询可用的Yarn 资源。
-qu,--queue <arg>:指定YARN 队列。
-s,--slots <arg>:设置每个Taskmanager的SLOT个数。
-t,--ship <arg>:指定待发送文件的目录。
-tm,--taskManagerMemory <arg>:为TaskManager设置内存。
-yd,--yarndetached:以分离模式启动。
-z,--zookeeperNamespace <args>:指定zookeeper的namespace。
-h:获取帮助。
启动一个常驻的Flink集群,接受来自Flink客户端的任务。
flink run
-c,--class <classname>:指定一个类作为程序运行的入口点。
-C,--classpath <url>:指定classpath。
-d,--detached:以分离方式运行job。
-files,--dependencyFiles <arg>:Flink程序依赖的文件。
-n,--allowNonRestoredState: 从快照点恢复时允许跳过不能恢复的状态。比如删除了程序中某个操作符,那么在恢复快照点时需要增加该参数。
-m,--jobmanager <host:port>:指定JobManager。
-p,--parallelism <parallelism>:指定job并行度,会覆盖配置文件中配置的并行度参数。
-q,--sysoutLogging:禁止flink日志输出至控制台。
-s,--fromSavepoint <savepointPath>:指定用于恢复job的savepoint路径。
-z,--zookeeperNamespace <zookeeperNamespace>:指定zookeeper的namespace。
-yat,--yarnapplicationType <arg>:为Yarn application自定义类型。
-yD <arg>:动态参数配置。
-yd,--yarndetached:以分离模式启动。
-yh,--yarnhelp:获取yarn帮助。
-yid,--yarnapplicationId <arg>:绑定到yarn session运行job。
-yj,--yarnjar <arg>:设置Flink jar文件路径。
-yjm,--yarnjobManagerMemory <arg>:为JobManager设置内存(MB)。
-ynm,--yarnname <arg>:为Yarn application自定义名称。
-yq,--yarnquery:查询可用的YARN资源(内存、CPU)。
-yqu,--yarnqueue <arg>:指定YARN队列。
-ys,--yarnslots:设置每个TaskManager的SLOT个数。
-yt,--yarnship <arg>:指定待发送文件的路径。
-ytm,--yarntaskManagerMemory <arg>:为TaskManager设置内存(MB)。
-yz,--yarnzookeeperNamespace <arg>:指定zookeeper的namespace,需与yarn-session.sh -z 保持一致。
-h:获取帮助。
Flink提交作业。
1."-y*"参数是指yarn-cluster模式下使用。
2.非"-y*"参数用户在用该命令提交任务前需要先用yarn-session启动Flink集群。
flink run-application
-D<property=value>:允许指定多个通用配置选项。
-t,–target:给定应用程序的部署目标,例如yarn-application、yarn-per-job。
-h:获取帮助。
Flink run-application提交作业。
flink info
-c,--class <classname>:指定一个类作为程序运行的入口点。
-p,--parallelism <parallelism>:指定程序运行的并行度。
-h:获取帮助。
显示所运行程序的执行计划(JSON)
flink list
-a,--all:显示所有的Job。
-m,--jobmanager <host:port>:指定JobManager。
-r,--running:仅显示running状态的Job。
-s,--scheduled:仅显示scheduled状态的Job。
-z,--zookeeperNamespace <zookeeperNamespace>:指定zookeeper的namespace。
-yid,--yarnapplicationId <arg>:绑定YARN session。
-h:获取帮助。
查询集群中运行的程序。
flink stop
-d,--drain:在触发savepoint和停止作业之前,发送MAX_WATERMARK。
-p,--savepointPath <savepointPath>:savepoint的储存路径,默认目录state.savepoints.dir。
-m,--jobmanager <host:port>:指定JobManager。
-z,--zookeeperNamespace <zookeeperNamespace>:指定zookeeper的namespace。
-yid,--yarnapplicationId <arg>:绑定YARN session。
-h:获取帮助。
强制停止一个运行中的Job(仅支持streaming jobs、业务代码 source 端需要 implements StoppableFunction)
flink cancel
-m,--jobmanager <host:port>:指定JobManager。
-s,--withSavepoint <targetDirectory>:取消Job时触发savepoint,默认目录state.savepoints.dir
-z,--zookeeperNamespace <zookeeperNamespace>:指定zookeeper的namespace。
-yid,--yarnapplicationId <arg>:绑定YARN session。
-h:获取帮助。
取消一个运行中Job
flink savepoint
-d,--dispose <arg>:指定savepoint的保存目录。
-m,--jobmanager <host:port>:指定JobManager。
-z,--zookeeperNamespace <zookeeperNamespace>:指定zookeeper的namespace。
-yid,--yarnapplicationId <arg>:绑定YARN session。
-h:获取帮助。
触发一个savepoint
source客户端安装目录/bigdata_env
无
导入客户端环境变量。
使用限制:如果用户使用自定义脚本(例如A.sh)并在脚本中调用该命令,则脚本A.sh不能传入参数。如果确实需要给A.sh传入参数,则需采用二次调用方式。
例如A.sh中调用B.sh,在B.sh中调用该命令。A.sh可以传入参数,B.sh不能传入参数。
start-scala-shell.sh
local | remote <host> <port> | yarn:运行模式
scala shell启动脚本
sh generate_keystore.sh
-
用户调用“generate_keystore.sh”脚本工具生成“Security Cookie”、“flink.keystore”和“flink.truststore”。需要输入自定义密码(不能包含#)。