PyFlink样例程序代码说明
通过Python API的方式提交Flink读写Kafka作业到Yarn上代码样例
下面列出pyflink-kafka.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。
import os import logging import sys from pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchema from pyflink.common.typeinfo import Types from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer from pyflink.datastream import StreamExecutionEnvironment from pyflink.table import TableEnvironment, EnvironmentSettings def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # 提交前修改sql路径 # file_path = "/opt/client/Flink/flink/insertData2kafka.sql" # file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" # file_path = "/opt/client/Flink/flink/conf/ssl/insertData2kafka.sql" file_path = "insertData2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() def read_write_kafka(): # find kafka connector jars env = StreamExecutionEnvironment.get_execution_environment() env.set_parallelism(1) specific_jars = "file:///opt/client/Flink/flink/lib/flink-connector-kafka-xxx.jar" # specific_jars = "file://" + os.getcwd() + "/../../../../yarnship/flink-connector-kafka-xxx.jar" # specific_jars = "file:///opt/client/Flink/flink/conf/ssl/flink-connector-kafka-xxx.jar" # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues env.add_jars(specific_jars) kafka_properties = {'bootstrap.servers': '192.168.20.162:21005', 'group.id': 'test_group'} deserialization_schema = JsonRowDeserializationSchema.builder() \ .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_consumer = FlinkKafkaConsumer( topics='test_source_topic', deserialization_schema=deserialization_schema, properties=kafka_properties) print("---------read ---------------") ds = env.add_source(kafka_consumer) serialization_schema = JsonRowSerializationSchema.builder().with_type_info( type_info=Types.ROW([Types.INT(), Types.STRING()])).build() kafka_producer = FlinkKafkaProducer( topic='test_sink_topic', serialization_schema=serialization_schema, producer_config=kafka_properties) print("--------write------------------") ds.add_sink(kafka_producer) env.execute("pyflink kafka test") if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") print("------------------insert data to kafka----------------") exec_sql() print("------------------read_write_kafka----------------") read_write_kafka()
参数 |
说明 |
示例 |
---|---|---|
bootstrap.servers |
Kafka的Broker实例业务IP和端口。 |
192.168.12.25:21005 |
specific_jars |
“客户端安装目录/Flink/flink/lib/flink-connector-kafka-*.jar”包路径,建议写全路径。
说明:
当作业需要以yarn-application模式提交时,需替换如下路径,jar包版本号请以实际为准: specific_jars="file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar" |
specific_jars = file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar |
file_path |
“insertData2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。
说明:
当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql" |
file_path = /客户端安装目录/Flink/flink/insertData2kafka.sql |
create table kafka_sink_table ( age int, name varchar(10) ) with ( 'connector' = 'kafka', 'topic' = 'test_source_topic', --写入Kafka的topic名称,需确保与上述Python文件中的topic相同 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'test_group', 'format' = 'json' ); create TABLE datagen_source_table ( age int, name varchar(10) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink_table SELECT * FROM datagen_source_table;
通过Python API的方式提交Flink SQL作业到Yarn上代码样例
下面列出pyflink-sql.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。
完整代码参见“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”。
import logging import sys import os from pyflink.table import (EnvironmentSettings, TableEnvironment) def read_sql(file_path): if not os.path.isfile(file_path): raise TypeError(file_path + " does not exist") all_the_text = open(file_path).read() return all_the_text def exec_sql(): # 提交之前修改SQL路径 file_path = "datagen2kafka.sql" sql = read_sql(file_path) t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode()) statement_set = t_env.create_statement_set() sqlArr = sql.split(";") for sqlStr in sqlArr: sqlStr = sqlStr.strip() if sqlStr.lower().startswith("create"): print("---------create---------------") print(sqlStr) t_env.execute_sql(sqlStr) if sqlStr.lower().startswith("insert"): print("---------insert---------------") print(sqlStr) statement_set.add_insert_sql(sqlStr) statement_set.execute() if __name__ == '__main__': logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s") exec_sql()
参数 |
说明 |
示例 |
---|---|---|
file_path |
“datagen2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。
说明:
当作业需要以yarn-application模式提交时,需替换如下路径: file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql" |
file_path = /客户端安装目录/Flink/flink/datagen2kafka.sql |
SQL示例:
create table kafka_sink ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) with ( 'connector' = 'kafka', 'topic' = 'input2', 'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号', 'properties.group.id' = 'testGroup2', 'scan.startup.mode' = 'latest-offset', 'format' = 'json' ); create TABLE datagen_source ( uuid varchar(20), name varchar(10), age int, ts timestamp(3), p varchar(20) ) WITH ( 'connector' = 'datagen', 'rows-per-second' = '1' ); INSERT INTO kafka_sink SELECT * FROM datagen_source;