Updated on 2024-04-02 GMT+08:00

Running the Program

  1. Obtain pyflink-sql.py and datagen2kafka.sql from the sample project flink-examples/pyflink-example/pyflink-sql.
  2. Package the prepared Python virtual environment by referring to Preparing for Development and Operating Environment and obtain the venv.zip file.

    zip -q -r venv.zip venv/

  3. Log in to the active management node as the root user and upload venv.zip, pyflink-sql.py, and datagen2kafka.sql files obtained in 1and 2 to the client environment.

    • Per-job: Upload the preceding files to Client installation directory/Flink/flink.
    • yarn-application: Upload the preceding files to Client installation directory/Flink/flink/yarnship.
    • yarn-session: Upload the preceding files to Client installation directory/Flink/flink/conf/ssl.

  4. Change file_path in pyflink-sql.py.

    • per-job: Change the path to the actual path of the SQL file. For example: Client installation directory/Flink/flink/datagen2kafka.sql
    • yarn-application: Change the path to os.getcwd() + "/../../../../yarnship/datagen2kafka.sql"
    • yarn-session: Change the path to the actual path of the SQL file. For example: Client installation directory/Flink/flink/conf/ssl//datagen2kafka.sql

  5. Run the following command to specify the running environment:

    export PYFLINK_CLIENT_EXECUTABLE=venv.zip/venv/bin/python3

  6. Run the following command to run the program:

    • Per-job:
      ./bin/flink run --detached -t yarn-per-job -Dyarn.application.name=py_sql -pyarch venv.zip -pyexec venv.zip/venv/bin/python3 -py pyflink-sql.py

      Execution result:

    • yarn-application
      ./bin/flink run-application --detached -t yarn-application -Dyarn.application.name=py_sql -Dyarn.ship-files=/opt/client/Flink/flink/yarnship/ -pyarch yarnship/venv.zip -pyexec venv.zip/venv/bin/python3 -pyclientexec venv.zip/venv/bin/python3 -pyfs yarnship -pym pyflink-sql

      Execution result:

    • yarn-session

      Before starting the Yarn session, prepare the running environment by referring to Preparing for Development and Operating Environment. Run the following command to start yarn-session:

       bin/yarn-session.sh -jm 1024 -tm 4096 -t conf/ssl/ -d

      Run the following command to submit the job:

      ./bin/flink run --detached -t yarn-session -Dyarn.application.name=py_sql -Dyarn.application.id=application_1685505909197_0285 -pyarch conf/ssl/venv.zip -pyexec conf/ssl/venv.zip/venv/bin/python3 -py conf/ssl/pyflink-sql.py

      Execution result: