Updated on 2024-04-02 GMT+08:00

Python Sample Code

Function

Submit a Flink SQL job to Yarn through Python APIs.

Sample Code

The main logic code in pyflink-sql.py is provided. Before submitting the code, ensure that file_path is the path where the SQL statement to be executed. You are advised to use a full path.

For details about the complete code, see pyflink-sql.py in flink-examples/pyflink-example/pyflink-sql.

import logging
import sys
import os
from pyflink.table import (EnvironmentSettings, TableEnvironment)
def read_sql(file_path):
    if not os.path.isfile(file_path):
        raise TypeError(file_path + " does not exist")
    all_the_text = open(file_path).read()
    return all_the_text
def exec_sql():
    # Change the SQL path before job submission.
    file_path = "datagen2kafka.sql"
    sql = read_sql(file_path)
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    statement_set = t_env.create_statement_set()
    sqlArr = sql.split(";")
    for sqlStr in sqlArr:
        sqlStr = sqlStr.strip()
        if sqlStr.lower().startswith("create"):
            print("---------create---------------")
            print(sqlStr)
            t_env.execute_sql(sqlStr)
        if sqlStr.lower().startswith("insert"):
            print("---------insert---------------")
            print(sqlStr)
            statement_set.add_insert_sql(sqlStr)
    statement_set.execute()
if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    exec_sql()
Table 1 Parameters for submitting a SQL job using Python

Parameter

Description

Example

file_path

Path of the datagen2kafka.sql file. You are advised to use a full path. Obtain the package from the secondary development sample code and upload it to the specified directory on the client.

NOTE:

If a job needs to be submitted as yarn-application, replace the following path:

file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql"

file_path = /Client installation directory/Flink/flink/datagen2kafka.sql

The following is an example SQL statement:

create table kafka_sink (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  p varchar(20)
) with (
  'connector' = 'kafka',
  'topic' = 'input2',
  'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number',
  'properties.group.id' = 'testGroup2',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);
create TABLE datagen_source (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  p varchar(20)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
INSERT INTO
  kafka_sink
SELECT
  *
FROM
  datagen_source;