更新时间:2024-10-31 GMT+08:00

PyFlink样例程序代码说明

通过Python API的方式提交Flink读写Kafka作业到Yarn上代码样例

下面列出pyflink-kafka.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。

完整代码参见“flink-examples/pyflink-example/pyflink-kafka”中的“pyflink-kafka.py”。
import os
import logging
import sys
from pyflink.common import JsonRowDeserializationSchema, JsonRowSerializationSchema
from pyflink.common.typeinfo import Types
from pyflink.datastream.connectors import FlinkKafkaProducer, FlinkKafkaConsumer
from pyflink.datastream import StreamExecutionEnvironment
from pyflink.table import TableEnvironment, EnvironmentSettings
def read_sql(file_path):
    if not os.path.isfile(file_path):
        raise TypeError(file_path + " does not exist")
    all_the_text = open(file_path).read()
    return all_the_text
def exec_sql():
    # 提交前修改sql路径
    # file_path = "/opt/client/Flink/flink/insertData2kafka.sql"
    # file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql"
    # file_path = "/opt/client/Flink/flink/conf/ssl/insertData2kafka.sql"
    file_path = "insertData2kafka.sql"
    sql = read_sql(file_path)
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    statement_set = t_env.create_statement_set()
    sqlArr = sql.split(";")
    for sqlStr in sqlArr:
        sqlStr = sqlStr.strip()
        if sqlStr.lower().startswith("create"):
            print("---------create---------------")
            print(sqlStr)
            t_env.execute_sql(sqlStr)
        if sqlStr.lower().startswith("insert"):
            print("---------insert---------------")
            print(sqlStr)
            statement_set.add_insert_sql(sqlStr)
    statement_set.execute()
def read_write_kafka():
    # find kafka connector jars
    env = StreamExecutionEnvironment.get_execution_environment()
    env.set_parallelism(1)
    specific_jars = "file:///opt/client/Flink/flink/lib/flink-connector-kafka-xxx.jar"
    # specific_jars = "file://" + os.getcwd() + "/../../../../yarnship/flink-connector-kafka-xxx.jar"
    # specific_jars = "file:///opt/client/Flink/flink/conf/ssl/flink-connector-kafka-xxx.jar"
    # the sql connector for kafka is used here as it's a fat jar and could avoid dependency issues
    env.add_jars(specific_jars)
    kafka_properties = {'bootstrap.servers': '192.168.20.162:21005', 'group.id': 'test_group'}
    deserialization_schema = JsonRowDeserializationSchema.builder() \
        .type_info(type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
    kafka_consumer = FlinkKafkaConsumer(
        topics='test_source_topic',
        deserialization_schema=deserialization_schema,
        properties=kafka_properties)
    print("---------read ---------------")
    ds = env.add_source(kafka_consumer)
    serialization_schema = JsonRowSerializationSchema.builder().with_type_info(
        type_info=Types.ROW([Types.INT(), Types.STRING()])).build()
    kafka_producer = FlinkKafkaProducer(
        topic='test_sink_topic',
        serialization_schema=serialization_schema,
        producer_config=kafka_properties)
    print("--------write------------------")
    ds.add_sink(kafka_producer)
    env.execute("pyflink kafka test")
if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    print("------------------insert data to kafka----------------")
    exec_sql()
    print("------------------read_write_kafka----------------")
    read_write_kafka()
表1 使用Python提交普通作业参数说明

参数

说明

示例

bootstrap.servers

Kafka的Broker实例业务IP和端口。

192.168.12.25:21005

specific_jars

客户端安装目录/Flink/flink/lib/flink-connector-kafka-*.jar”包路径,建议写全路径。

说明:

当作业需要以yarn-application模式提交时,需替换如下路径,jar包版本号请以实际为准:

specific_jars="file://"+os.getcwd()+"/../../../../yarnship/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar"

specific_jars = file:///客户端安装目录/Flink/flink/lib/flink-connector-kafka-1.15.0-h0.cbu.mrs.330.r13.jar

file_path

“insertData2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。

说明:

当作业需要以yarn-application模式提交时,需替换如下路径:

file_path = os.getcwd() + "/../../../../yarnship/insertData2kafka.sql"

file_path = /客户端安装目录/Flink/flink/insertData2kafka.sql

SQL示例:
create table kafka_sink_table (
  age int,
  name varchar(10)
) with (
  'connector' = 'kafka',
  'topic' = 'test_source_topic', --写入Kafka的topic名称,需确保与上述Python文件中的topic相同
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
  'properties.group.id' = 'test_group',
  'format' = 'json'
);
create TABLE datagen_source_table (
   age int,
  name varchar(10)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
INSERT INTO
  kafka_sink_table
SELECT
  *
FROM
  datagen_source_table;

通过Python API的方式提交Flink SQL作业到Yarn上代码样例

下面列出pyflink-sql.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。

完整代码参见“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”。

import logging
import sys
import os
from pyflink.table import (EnvironmentSettings, TableEnvironment)
def read_sql(file_path):
    if not os.path.isfile(file_path):
        raise TypeError(file_path + " does not exist")
    all_the_text = open(file_path).read()
    return all_the_text
def exec_sql():
    # 提交之前修改SQL路径
    file_path = "datagen2kafka.sql"
    sql = read_sql(file_path)
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    statement_set = t_env.create_statement_set()
    sqlArr = sql.split(";")
    for sqlStr in sqlArr:
        sqlStr = sqlStr.strip()
        if sqlStr.lower().startswith("create"):
            print("---------create---------------")
            print(sqlStr)
            t_env.execute_sql(sqlStr)
        if sqlStr.lower().startswith("insert"):
            print("---------insert---------------")
            print(sqlStr)
            statement_set.add_insert_sql(sqlStr)
    statement_set.execute()
if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    exec_sql()
表2 使用Python提交SQL作业参数说明

参数

说明

示例

file_path

“datagen2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。

说明:

当作业需要以yarn-application模式提交时,需替换如下路径:

file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql"

file_path = /客户端安装目录/Flink/flink/datagen2kafka.sql

SQL示例:

create table kafka_sink (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  p varchar(20)
) with (
  'connector' = 'kafka',
  'topic' = 'input2',
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
  'properties.group.id' = 'testGroup2',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);
create TABLE datagen_source (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  p varchar(20)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
INSERT INTO
  kafka_sink
SELECT
  *
FROM
  datagen_source;