更新时间:2023-11-10 GMT+08:00
分享

Python样例代码

功能介绍

通过python API的方式提交Flink SQL作业到Yarn上。

代码样例

下面列出pyflink-sql.py的主要逻辑代码作为演示,在提交之前需要确保“file_path” 为要运行的SQL的路径,建议写全路径。

完整代码参见“flink-examples/pyflink-example/pyflink-sql”中的“pyflink-sql.py”。

import logging
import sys
import os
from pyflink.table import (EnvironmentSettings, TableEnvironment)
def read_sql(file_path):
    if not os.path.isfile(file_path):
        raise TypeError(file_path + " does not exist")
    all_the_text = open(file_path).read()
    return all_the_text
def exec_sql():
    # 提交之前修改SQL路径
    file_path = "datagen2kafka.sql"
    sql = read_sql(file_path)
    t_env = TableEnvironment.create(EnvironmentSettings.in_streaming_mode())
    statement_set = t_env.create_statement_set()
    sqlArr = sql.split(";")
    for sqlStr in sqlArr:
        sqlStr = sqlStr.strip()
        if sqlStr.lower().startswith("create"):
            print("---------create---------------")
            print(sqlStr)
            t_env.execute_sql(sqlStr)
        if sqlStr.lower().startswith("insert"):
            print("---------insert---------------")
            print(sqlStr)
            statement_set.add_insert_sql(sqlStr)
    statement_set.execute()
if __name__ == '__main__':
    logging.basicConfig(stream=sys.stdout, level=logging.INFO, format="%(message)s")
    exec_sql()
表1 使用Python提交SQL作业参数说明

参数

说明

示例

file_path

“datagen2kafka.sql”文件路径,建议写全路径。需在二次样例代码中获取并上传至客户端指定目录。

说明:

当作业需要以yarn-application模式提交时,需替换如下路径:

file_path = os.getcwd() + "/../../../../yarnship/datagen2kafka.sql"

file_path = /客户端安装目录/Flink/flink/datagen2kafka.sql

SQL示例:

create table kafka_sink (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  p varchar(20)
) with (
  'connector' = 'kafka',
  'topic' = 'input2',
  'properties.bootstrap.servers' = 'Kafka的Broker实例业务IP:Kafka端口号',
  'properties.group.id' = 'testGroup2',
  'scan.startup.mode' = 'latest-offset',
  'format' = 'json'
);
create TABLE datagen_source (
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  p varchar(20)
) WITH (
  'connector' = 'datagen',
  'rows-per-second' = '1'
);
INSERT INTO
  kafka_sink
SELECT
  *
FROM
  datagen_source;
分享:

    相关文档

    相关产品