更新时间:2024-03-15 GMT+08:00

查询单个转储任务

功能介绍

查询单个转储任务。

当前页面API为历史版本API,未来可能停止维护。请使用查询Smart Connector任务详情

调用方法

请参见如何调用API

URI

GET /v2/{project_id}/connectors/{connector_id}/sink-tasks/{task_id}

表1 路径参数

参数

是否必选

参数类型

描述

project_id

String

项目ID,获取方式请参见获取项目ID

connector_id

String

实例转储ID。

请参考查询实例返回的数据。

task_id

String

转储任务ID。

表2 Query参数

参数

是否必选

参数类型

描述

topic-info

String

是否包含topic信息。默认是false。

请求参数

响应参数

状态码: 200

表3 响应Body参数

参数

参数类型

描述

task_name

String

转储任务名称。

destination_type

String

转储任务类型。

create_time

Long

转储任务创建时间戳。

status

String

转储任务状态。

topics

String

返回任务转存的topics列表或者正则表达式。

obs_destination_descriptor

obs_destination_descriptor object

转存目标的描述。

topics_info

Array of topics_info objects

topic信息。

表4 obs_destination_descriptor

参数

参数类型

描述

consumer_strategy

String

消费启动策略:

  • latest:从Topic最后端开始消费。
  • earliest: 从Topic最前端消息开始消费。

默认是latest。

destination_file_type

String

转储文件格式。目前只支持text格式。

obs_bucket_name

String

存储该通道数据的OBS桶名称。

obs_path

String

存储在obs的路径。

partition_format

String

将转储文件的生成时间使用“yyyy/MM/dd/HH/mm”格式生成分区字符串,用来定义写到OBS的Object文件所在的目录层次结构。

  • yyyy:年
  • yyyy/MM:年/月
  • yyyy/MM/dd:年/月/日
  • yyyy/MM/dd/HH:年/月/日/时
  • yyyy/MM/dd/HH/mm:年/月/日/时/分,例如:2017/11/10/14/49,目录结构就是“2017 > 11 > 10 > 14 > 49”,“2017”表示最外层文件夹。
说明:

数据转储成功后,存储的目录结构为“obs_bucket_path/file_prefix/partition_format”。默认时间是GMT+8 时间

record_delimiter

String

转储文件的记录分隔符,用于分隔写入转储文件的用户数据。

取值范围:

  • 逗号“,”
  • 分号“;”
  • 竖线“|”
  • 换行符“\n”
  • NULL

默认值:换行符“\n”。

deliver_time_interval

Integer

根据用户配置的时间,周期性的将数据导入OBS,若某个时间段内无数据,则此时间段不会生成打包文件。

取值范围:30~900,缺省值:300,单位:秒。

说明:

使用OBS通道转储流式数据时该参数为必选配置。

obs_part_size

Long

每个传输文件多大后就开始上传,单位为byte。

默认值5242880。

表5 topics_info

参数

参数类型

描述

topic

String

topic名称。

partitions

Array of partitions objects

分区列表。

表6 partitions

参数

参数类型

描述

partition_id

String

分区ID。

status

String

运行状态。

last_transfer_offset

String

已转储的消息偏移量。

log_end_offset

String

消息偏移量。

lag

String

积压的消息数。

请求示例

查询指定的转储任务详情。

GET https://{endpoint}/v2/{project_id}/connectors/{connector_id}/sink-tasks/{task_id}?topic-info=true

响应示例

状态码: 200

查询单个转储任务成功。

{
  "task_name" : "obsTransfer-56997523",
  "destination_type" : "OBS",
  "create_time" : 1628126621283,
  "status" : "RUNNING",
  "topics" : "topic-sdk-no-delete",
  "obs_destination_descriptor" : {
    "consumer_strategy" : "earliest",
    "destination_file_type" : "TEXT",
    "obs_bucket_name" : "testobs",
    "obs_path" : "obsTransfer-56997523",
    "partition_format" : "yyyy/MM/dd/HH/mm",
    "record_delimiter" : "",
    "deliver_time_interval" : 300,
    "obs_part_size" : 5242880
  },
  "topics_info" : [ {
    "topic" : "topic-sdk-no-delete",
    "partitions" : [ {
      "partition_id" : "2",
      "status" : "RUNNING",
      "last_transfer_offset" : "3",
      "log_end_offset" : "3",
      "lag" : "0"
    }, {
      "partition_id" : "1",
      "status" : "RUNNING",
      "last_transfer_offset" : "3",
      "log_end_offset" : "3",
      "lag" : "0"
    }, {
      "partition_id" : "0",
      "status" : "RUNNING",
      "last_transfer_offset" : "3",
      "log_end_offset" : "3",
      "lag" : "0"
    } ]
  } ]
}

SDK代码示例

SDK代码示例如下。

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package com.huaweicloud.sdk.test;

import com.huaweicloud.sdk.core.auth.ICredential;
import com.huaweicloud.sdk.core.auth.BasicCredentials;
import com.huaweicloud.sdk.core.exception.ConnectionException;
import com.huaweicloud.sdk.core.exception.RequestTimeoutException;
import com.huaweicloud.sdk.core.exception.ServiceResponseException;
import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion;
import com.huaweicloud.sdk.kafka.v2.*;
import com.huaweicloud.sdk.kafka.v2.model.*;


public class ShowSinkTaskDetailSolution {

    public static void main(String[] args) {
        // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
        // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
        String ak = System.getenv("CLOUD_SDK_AK");
        String sk = System.getenv("CLOUD_SDK_SK");

        ICredential auth = new BasicCredentials()
                .withAk(ak)
                .withSk(sk);

        KafkaClient client = KafkaClient.newBuilder()
                .withCredential(auth)
                .withRegion(KafkaRegion.valueOf("<YOUR REGION>"))
                .build();
        ShowSinkTaskDetailRequest request = new ShowSinkTaskDetailRequest();
        request.withTopicInfo(ShowSinkTaskDetailRequest.TopicInfoEnum.fromValue("<topic-info>"));
        try {
            ShowSinkTaskDetailResponse response = client.showSinkTaskDetail(request);
            System.out.println(response.toString());
        } catch (ConnectionException e) {
            e.printStackTrace();
        } catch (RequestTimeoutException e) {
            e.printStackTrace();
        } catch (ServiceResponseException e) {
            e.printStackTrace();
            System.out.println(e.getHttpStatusCode());
            System.out.println(e.getRequestId());
            System.out.println(e.getErrorCode());
            System.out.println(e.getErrorMsg());
        }
    }
}
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
# coding: utf-8

from huaweicloudsdkcore.auth.credentials import BasicCredentials
from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion
from huaweicloudsdkcore.exceptions import exceptions
from huaweicloudsdkkafka.v2 import *

if __name__ == "__main__":
    # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
    # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
    ak = __import__('os').getenv("CLOUD_SDK_AK")
    sk = __import__('os').getenv("CLOUD_SDK_SK")

    credentials = BasicCredentials(ak, sk) \

    client = KafkaClient.new_builder() \
        .with_credentials(credentials) \
        .with_region(KafkaRegion.value_of("<YOUR REGION>")) \
        .build()

    try:
        request = ShowSinkTaskDetailRequest()
        request.topic_info = "<topic-info>"
        response = client.show_sink_task_detail(request)
        print(response)
    except exceptions.ClientRequestException as e:
        print(e.status_code)
        print(e.request_id)
        print(e.error_code)
        print(e.error_msg)
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package main

import (
	"fmt"
	"github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic"
    kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2"
	"github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model"
    region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region"
)

func main() {
    // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security.
    // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment
    ak := os.Getenv("CLOUD_SDK_AK")
    sk := os.Getenv("CLOUD_SDK_SK")

    auth := basic.NewCredentialsBuilder().
        WithAk(ak).
        WithSk(sk).
        Build()

    client := kafka.NewKafkaClient(
        kafka.KafkaClientBuilder().
            WithRegion(region.ValueOf("<YOUR REGION>")).
            WithCredential(auth).
            Build())

    request := &model.ShowSinkTaskDetailRequest{}
	topicInfoRequest:= model.GetShowSinkTaskDetailRequestTopicInfoEnum().<TOPIC_INFO>
	request.TopicInfo = &topicInfoRequest
	response, err := client.ShowSinkTaskDetail(request)
	if err == nil {
        fmt.Printf("%+v\n", response)
    } else {
        fmt.Println(err)
    }
}

状态码

状态码

描述

200

查询单个转储任务成功。

错误码

请参见错误码