更新时间:2024-08-03 GMT+08:00

使用Python提交Flink SQL作业

  1. 获取样例工程“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”和“datagen2kafka.sql”。
  2. 参考准备本地应用开发环境将准备好的Python虚拟环境打包,获取“venv.zip”文件。

    zip -q -r venv.zip venv/

  3. root用户登录主管理节点,将12获取的“venv.zip”、“pyflink-sql.py”和“datagen2kafka.sql”文件上传至客户端环境。

    • per-job模式:将上述文件上传到“客户端安装目录/Flink/flink”。
    • yarn-application模式:将上述文件上传到“客户端安装目录/Flink/flink/yarnship”。
    • yarn-session模式:将上述文件上传到“客户端安装目录/Flink/flink/conf/ssl”。

  4. 修改“pyflink-sql.py”中的“file_path”路径。

    • per-job模式:修改为sql文件的实际路径。如:客户端安装目录/Flink/flink/datagen2kafka.sql
    • yarn-application模式:修改为os.getcwd() + "/../../../../yarnship/datagen2kafka.sql"
    • yarn-session模式:修改为sql文件的实际路径。如:客户端安装目录/Flink/flink/conf/ssl//datagen2kafka.sql

  5. 执行下面命令指定运行环境。

    export PYFLINK_CLIENT_EXECUTABLE=venv.zip/venv/bin/python3

  6. 执行以下命令运行程序。

    • per-job模式
      ./bin/flink run --detached -t yarn-per-job -Dyarn.application.name=py_sql -pyarch venv.zip -pyexec venv.zip/venv/bin/python3 -py pyflink-sql.py

      运行结果:

    • yarn-application模式
      ./bin/flink run-application --detached -t yarn-application -Dyarn.application.name=py_sql -Dyarn.ship-files=/opt/client/Flink/flink/yarnship/ -pyarch yarnship/venv.zip -pyexec venv.zip/venv/bin/python3 -pyclientexec venv.zip/venv/bin/python3 -pyfs yarnship -pym pyflink-sql

      运行结果:

    • yarn-session模式

      在启动yarnsession之前需要参考准备本地应用开发环境章节准备运行环境,使用下面命令启动yarn-session:

       bin/yarn-session.sh -jm 1024 -tm 4096 -t conf/ssl/ -d

      使用下面命令提交任务:

      ./bin/flink run --detached -t yarn-session -Dyarn.application.name=py_sql -Dyarn.application.id=application_1685505909197_0285 -pyarch conf/ssl/venv.zip -pyexec conf/ssl/venv.zip/venv/bin/python3 -py conf/ssl/pyflink-sql.py

      运行结果: