更新时间:2024-06-05 GMT+08:00
使用Python提交Flink普通作业
- 获取样例工程“flink-examples/pyflink-example/pyflink-kafka”中的“pyflink-kafka.py”和“insertData2kafka.sql”。
- 参考准备本地应用开发环境将准备好的Python虚拟环境打包,获取“venv.zip”文件。
zip -q -r venv.zip venv/
- 以root用户登录主管理节点,将1和2获取的“venv.zip”、“pyflink-kafka.py”和“insertData2kafka.sql”文件上传至客户端环境。
- per-job模式:将上述文件上传到“客户端安装目录/Flink/flink”。
- yarn-application模式:将上述文件和“flink-connector-kafka-实际版本号.jar”包上传到“客户端安装目录/Flink/flink/yarnship”。
- 修改“pyflink-kafka.py”中的“specific_jars”路径。
- per-job模式:修改为SQL文件的实际路径。如:file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-实际版本号.jar
- yarn-application模式:修改为:file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-实际版本号.jar
- 修改“pyflink-kafka.py”中的“file_path”路径。
- per-job模式:修改为sql文件的实际路径。如:客户端安装目录/Flink/flink/insertData2kafka.sql
- yarn-application模式:修改为os.getcwd() + "/../../../../yarnship/insertData2kafka.sql"
- 执行以下命令指定运行环境。
export PYFLINK_CLIENT_EXECUTABLE=venv.zip/venv/bin/python3
- 执行以下命令运行程序。
- per-job模式:
./bin/flink run --detached -t yarn-per-job -Dyarn.application.name=py_kafka -pyarch venv.zip -pyexec venv.zip/venv/bin/python3 -py pyflink-kafka.py
运行结果:
- yarn-application模式
./bin/flink run-application --detached -t yarn-application -Dyarn.application.name=py_kafka -Dyarn.ship-files=/opt/client/Flink/flink/yarnship/ -pyarch yarnship/venv.zip -pyexec venv.zip/venv/bin/python3 -pyclientexec venv.zip/venv/bin/python3 -pyfs yarnship -pym pyflink-kafka
运行结果:
- per-job模式:
父主题: PyFlink样例程序