更新时间:2024-01-24 GMT+08:00

Flink常见Shell命令

本章节适用于MRS 3.x及之后版本。

在使用Flink的Shell脚本前,首先需要执行以下操作,详细使用场景可参考从零开始使用Flink运行wordcount作业:

  1. 安装Flink客户端,例如安装目录为“/opt/client”。
  2. 初始化环境变量。

    source /opt/client/bigdata_env

  3. 如果当前集群已启用Kerberos认证,需先配置客户端认证,可参考5。如果当前集群未启用Kerberos认证,则无需执行该步骤。
  4. 参考表1运行相关命令。

    表1 Flink Shell命令参考

    命令

    参数说明

    描述

    yarn-session.sh

    -at,--applicationType <arg>:为Yarn application自定义类型。

    -D <property=value>:动态参数配置。

    -d,--detached:关闭交互模式,启动一个分离的Flink YARN session。

    -h,--help: 显示Yarn session CLI的帮助。

    -id,--applicationId <arg>:绑定到一个已经运行的Yarn session。

    -j,--jar <arg>:设置用户jar包路径。

    -jm,--jobManagerMemory <arg>:为JobManager设置内存。

    -m,--jobmanager <arg>:要连接的JobManager的地址,使用该参数可以连接特定的JobManager。

    -nl,--nodeLabel <arg>: 指定YARN application的nodeLabel 。

    -nm,--name <arg>:为Yarn application自定义名称。

    -q,--query:查询可用的Yarn 资源。

    -qu,--queue <arg>:指定YARN 队列。

    -s,--slots <arg>:设置每个Taskmanager的SLOT个数。

    -t,--ship <arg>:指定待发送文件的目录。

    -tm,--taskManagerMemory <arg>:为TaskManager设置内存。

    -yd,--yarndetached:以分离模式启动。

    -z,--zookeeperNamespace <args>:指定zookeeper的namespace。

    -h:获取帮助。

    启动一个常驻的Flink集群,接受来自Flink客户端的任务。

    flink run

    -c,--class <classname>:指定一个类作为程序运行的入口点。

    -C,--classpath <url>:指定classpath。

    -d,--detached:以分离方式运行job。

    -n,--allowNonRestoredState: 从快照点恢复时允许跳过不能恢复的状态。比如删除了程序中某个操作符,那么在恢复快照点时需要增加该参数。

    -m,--jobmanager <host:port>:指定JobManager。

    -p,--parallelism <parallelism>:指定job并行度,会覆盖配置文件中配置的并行度参数。

    -q,--sysoutLogging:禁止flink日志输出至控制台。

    -s,--fromSavepoint <savepointPath>:指定用于恢复job的savepoint路径。

    -z,--zookeeperNamespace <zookeeperNamespace>:指定zookeeper的namespace。

    -yat,--yarnapplicationType <arg>:为Yarn application自定义类型。

    -yD <arg>:动态参数配置。

    -yd,--yarndetached:以分离模式启动。

    -yh,--yarnhelp:获取yarn帮助。

    -yid,--yarnapplicationId <arg>:绑定到yarn session运行job。

    -yj,--yarnjar <arg>:设置Flink jar文件路径。

    -yjm,--yarnjobManagerMemory <arg>:为JobManager设置内存(MB)。

    -ynm,--yarnname <arg>:为Yarn application自定义名称。

    -yq,--yarnquery:查询可用的YARN资源(内存、CPU)。

    -yqu,--yarnqueue <arg>:指定YARN队列。

    -ys,--yarnslots:设置每个TaskManager的SLOT个数。

    -yt,--yarnship <arg>:指定待发送文件的路径。

    -ytm,--yarntaskManagerMemory <arg>:为TaskManager设置内存(MB)。

    -yz,--yarnzookeeperNamespace <arg>:指定zookeeper的namespace,需与yarn-session.sh -z 保持一致。

    -h:获取帮助。

    Flink提交作业。

    1. "-y*"参数是指yarn-cluster模式下使用。
    2. 非"-y*"参数用户在用该命令提交任务前需要先用yarn-session启动Flink集群。

    flink info

    -c,--class <classname>:指定一个类作为程序运行的入口点。

    -p,--parallelism <parallelism>:指定程序运行的并行度。

    -h:获取帮助。

    显示所运行程序的执行计划(JSON)

    flink list

    -a,--all:显示所有的Job。

    -m,--jobmanager <host:port>:指定JobManager。

    -r,--running:仅显示running状态的Job。

    -s,--scheduled:仅显示scheduled状态的Job。

    -z,--zookeeperNamespace <zookeeperNamespace>:指定zookeeper的namespace。

    -yid,--yarnapplicationId <arg>:绑定YARN session。

    -h:获取帮助。

    查询集群中运行的程序。

    flink stop

    -d,--drain:在触发savepoint和停止作业之前,发送MAX_WATERMARK。

    -p,--savepointPath <savepointPath>:savepoint的储存路径,默认目录state.savepoints.dir。

    -m,--jobmanager <host:port>:指定JobManager。

    -z,--zookeeperNamespace <zookeeperNamespace>:指定zookeeper的namespace。

    -yid,--yarnapplicationId <arg>:绑定YARN session。

    -h:获取帮助。

    强制停止一个运行中的Job(仅支持streaming jobs、业务代码 source 端需要 implements StoppableFunction)

    flink cancel

    -m,--jobmanager <host:port>:指定JobManager。

    -s,--withSavepoint <targetDirectory>:取消Job时触发savepoint,默认目录state.savepoints.dir

    -z,--zookeeperNamespace <zookeeperNamespace>:指定zookeeper的namespace。

    -yid,--yarnapplicationId <arg>:绑定YARN session。

    -h:获取帮助。

    取消一个运行中Job

    flink savepoint

    -d,--dispose <arg>:指定savepoint的保存目录。

    -m,--jobmanager <host:port>:指定JobManager。

    -z,--zookeeperNamespace <zookeeperNamespace>:指定zookeeper的namespace。

    -yid,--yarnapplicationId <arg>:绑定YARN session。

    -h:获取帮助。

    触发一个savepoint

    source客户端安装目录/bigdata_env

    导入客户端环境变量。

    使用限制:如果用户使用自定义脚本(例如A.sh)并在脚本中调用该命令,则脚本A.sh不能传入参数。如果确实需要给A.sh传入参数,则需采用二次调用方式。

    例如A.sh中调用B.sh,在B.sh中调用该命令。A.sh可以传入参数,B.sh不能传入参数。

    start-scala-shell.sh

    local | remote <host> <port> | yarn:运行模式

    scala shell启动脚本

    sh generate_keystore.sh

    -

    用户调用“generate_keystore.sh”脚本工具生成“Security Cookie”、“flink.keystore”和“flink.truststore”。需要输入自定义密码(不能包含#)。