Python Sample Code
Function
Submit a Flink SQL job to Yarn through Python APIs.
Sample Code
The main logic code in pyflink-sql.py is provided. Before submitting the code, ensure that file_path is the path where the SQL statement to be executed. You are advised to use a full path.
For details about the complete code, see pyflink-sql.py in flink-examples/pyflink-example/pyflink-sql.
import logging import sys import os from pyflink.table import (EnvironmentSettings, TableEnvironment) def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # Change the SQL path before job submission. file_path = "datagen2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") exec_sql()
Parameter |
Description |
Example |
---|---|---|
file_path |
Path of the datagen2kafka.sql file. You are advised to use a full path. Obtain the package from the secondary development sample code and upload it to the specified directory on the client.
NOTE:
If a job needs to be submitted as yarn-application, replace the following path: file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql" |
file_path = /Client installation directory/Flink/flink/datagen2kafka.sql |
The following is an example SQL statement:
create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'IP address of the Kafka broker instance:Kafka port number', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source;
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot