更新时间:2025-06-11 GMT+08:00
分享

使用SDK提交Flink SQL作业

本节操作介绍使用DLI SDK V2提交Flink SQL作业的操作方法。

2024年5月起,新用户可以直接使用DLI SDK V2,无需开通白名单。

对于2024年5月之前开通并使用DLI服务的用户,如需使用“DLI SDK V2”功能,必须提交工单申请加入白名单。

前提条件

操作前准备

获取AK/SK,项目ID及对应的Region信息。

  1. 管理控制台。
  2. 单击界面右上角的登录用户名,在下拉列表中单击“我的凭证”。
  3. 在左侧导航栏中选择“访问密钥”,单击“新增访问密钥”。根据提示输入对应信息,单击“确定”。
  4. 在弹出的提示页面单击“立即下载”。下载成功后,打开凭证文件,获取AK/SK信息。
  5. 左侧导航栏单击“API凭证”,在“项目列表”中获取“项目ID”即为project_id值,对应的“项目”即为region的值。

样例代码

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

def main():
    your_access_key = os.getenv("HUAWEICLOUD_SDK_AK")
    your_secret_key = os.getenv("HUAWEICLOUD_SDK_SK")
    project_id = "your_project_id"
    region_name = "region_name"  
    credentials = BasicCredentials(your_access_key, your_secret_key, project_id)
    dli_client = DliClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(DliRegion.value_of(region_name)) \
        .build()

    try:
        # 步骤一:创建Flink作业。作业状态将变成 “草稿”
        job_id = create_flink_sql_job(dli_client, "your_queue_name")
        logger.info("job_id: %d", job_id)

        # 步骤二:运行作业。作业状态将从 “草稿” 变成 “提交中”
        resps = batch_run_flink_sql_jobs(dli_client, [job_id])
        logger.info("Response: %s", resps)

        # 步骤三:查询作业状态。如果您想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。
        check_running(dli_client, job_id)

    except exceptions.ClientRequestException as e:
        # 请根据业务实际情况处理异常信息,此处仅作样例。
        logger.error("Failed to execute job:", e)

创建Flink SQL作业

  • 功能介绍:

    用于创建Flink SQL作业。

  • 相关链接:

    关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.create_flink_sql_job。新建Flink SQL作业

    创建 Flink SQL作业。作业状态将变成 “草稿”

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    def create_flink_sql_job(client, queue_name):
        """
        创建Flink作业。作业状态将变成 “草稿”
        关键SDK API:huaweicloudsdkdli.v1.dli_client.DliClient.create_flink_sql_job。详见 CreateFlinkSqlJob
    
        :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client.
        :param str queue_name: Queue name for the job to execute
        :return: int
        """
        request_body = CreateFlinkSqlJobRequestBody(
            name="your_job_name",  # 作业名称,名字必须唯一,比如 flink_jar_job_demo。长度限制:1-57个字符。
            desc="your flink job's description",  # 用户自定义描述。长度限制:0-512个字符。
            sql_body="""create table orders(
                  name string,
                  num INT
                ) with (
                  'connector' = 'datagen',
                  'rows-per-second' = '1', 
                  'fields.name.kind' = 'random', 
                  'fields.name.length' = '5'
                );
                CREATE TABLE sink_table (
                   name string,
                   num INT
                ) WITH (
                   'connector' = 'print'
                );
                INSERT into sink_table SELECT * from orders;""",
            # 自定义 Stream SQL语句,至少包含source, query, sink三个部分。长度限制:1024*1024个字符。
            # 本SQL示例:自动生成随机source数据,并打印到控制台。
            queue_name=queue_name,   # 通用队列名称。队列名称。长度限制:0-128个字符。
            run_mode="exclusive_cluster",  # 作业运行模式。只支持 exclusive_cluster 独享模式。
            log_enabled=True,  # 开启作业的日志上传到用户的OBS功能
            obs_bucket="your_obs_bucket_name",  # OBS桶名。用于保存 日志 和 checkpoint数据
            job_type="flink_opensource_sql_job",  # 作业类型。建议选择: "flink_opensource_sql_job"
            flink_version="1.12"  # 指定Flink版本
        )
        request = CreateFlinkSqlJobRequest(body=request_body)
        response = client.create_flink_sql_job(request)
        return response.job.job_id
    

批量运行Flink作业

  • 功能介绍:

    批量运行Flink SQL作业。

  • 相关链接:

    关键SDK API:

    huaweicloudsdkdli.v1.dli_client.DliClient.batch_run_flink_jobs。批量运行Flink作业

    批量运行Flink作业,作业状态将从 “草稿” 变成 “提交中”。

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    def batch_run_flink_sql_jobs(client, job_ids):
        """
        运行作业。作业状态将从 “草稿” 变成 “提交中”    
        :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client.
        :param list[int] job_ids: The job ids for running.
        :return: The body of this BatchRunFlinkJobsResponse.
        :rtype: list[:class:`huaweicloudsdkdli.v1.FlinkSuccessResponse`]
        """
        request_body = BatchRunFlinkJobsRequestBody(job_ids=job_ids)
        request = BatchRunFlinkJobsRequest(body=request_body)
        response = client.batch_run_flink_jobs(request)
        return response.body
    

查询作业状态

  • 功能介绍:

    查询Flink SQL作业状态。

  • 相关链接:

    关键SDK API:

    huaweicloudsdkdli.v1.dli_client.DliClient.show_flink_job。查询Flink作业详情

    如果想在当前线程等待作业进入“运行中”状态,可循环检查状态,直到作业进入“运行中”状态。

  • 示例代码
     1
     2
     3
     4
     5
     6
     7
     8
     9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    def check_running(client, job_id):
        """
        如果您想在当前线程等待作业进入“运行中”状态,可执行此方法,循环检查状态,直到作业进入“运行中”状态。
        
        :param huaweicloudsdkdli.v1.dli_client.DliClient client: DLI Client.
        :param int job_id: The job id for getting status.
        :return:
        """
        while True:
            try:
                request = ShowFlinkJobRequest(job_id=job_id)
                response = client.show_flink_job(request)
            except exceptions.ClientRequestException as e:
                raise Exception(f"Failed to get Flink sql job status by id: {job_id}") from e
    
            status = response.job_detail.status
            logger.info("FlinkSqlJob id %d status: %s", job_id, status)
    
            if status == "job_running":
                return
            if status in ["job_submit_fail", "job_running_exception"]:
                raise Exception(f"Run Flink sql job failed: {response}")
    
            time.sleep(1)
    

相关文档