Using Python to Submit a Flink SQL Job
- Obtain pyflink-sql.py and datagen2kafka.sql from the sample project flink-examples/pyflink-example/pyflink-sql.
- Package the prepared Python virtual environment by referring to Preparing for Development and Operating Environment and obtain the venv.zip file.
zip -q -r venv.zip venv/
- Log in to the active management node as the root user and upload venv.zip, pyflink-sql.py, and datagen2kafka.sql files obtained in 1and 2 to the client environment.
- Per-job: Upload the preceding files to Client installation directory/Flink/flink.
- yarn-application: Upload the preceding files to Client installation directory/Flink/flink/yarnship.
- yarn-session: Upload the preceding files to Client installation directory/Flink/flink/conf/ssl.
- Change file_path in pyflink-sql.py.
- per-job: Change the path to the actual path of the SQL file. For example: Client installation directory/Flink/flink/datagen2kafka.sql
- yarn-application: Change the path to os.getcwd() + "/../../../../yarnship/datagen2kafka.sql"
- yarn-session: Change the path to the actual path of the SQL file. For example: Client installation directory/Flink/flink/conf/ssl//datagen2kafka.sql
- Run the following command to specify the running environment:
export PYFLINK_CLIENT_EXECUTABLE=venv.zip/venv/bin/python3
- Run the following command to run the program:
- Per-job:
./bin/flink run --detached -t yarn-per-job -Dyarn.application.name=py_sql -pyarch venv.zip -pyexec venv.zip/venv/bin/python3 -py pyflink-sql.py
Execution result:
- yarn-application
./bin/flink run-application --detached -t yarn-application -Dyarn.application.name=py_sql -Dyarn.ship-files=/opt/client/Flink/flink/yarnship/ -pyarch yarnship/venv.zip -pyexec venv.zip/venv/bin/python3 -pyclientexec venv.zip/venv/bin/python3 -pyfs yarnship -pym pyflink-sql
Execution result:
- yarn-session
Before starting the Yarn session, prepare the running environment by referring to Preparing for Development and Operating Environment. Run the following command to start yarn-session:
bin/yarn-session.sh -jm 1024 -tm 4096 -t conf/ssl/ -d
Run the following command to submit the job:
./bin/flink run --detached -t yarn-session -Dyarn.application.name=py_sql -Dyarn.application.id=application_1685505909197_0285 -pyarch conf/ssl/venv.zip -pyexec conf/ssl/venv.zip/venv/bin/python3 -py conf/ssl/pyflink-sql.py
Execution result:
- Per-job:
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot