Creating a Dumping Task
Function
This API is used to create a dumping task.
This API is out-of-date and may not be maintained in the future. Please use the API described in Enabling Smart Connect (Pay-per-Use Instance).
Call Method
For details, see How to Call an API.
URI
POST /v2/{project_id}/connectors/{connector_id}/sink-tasks
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
project_id |
Yes |
String |
Project ID. For details, see Obtaining a Project ID. |
connector_id |
Yes |
String |
Instance dump ID. For details, see Querying Instance IDs. |
Request Parameters
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
source_type |
Yes |
String |
Source data type. Currently, only BLOB is supported. |
task_name |
Yes |
String |
Name of a dumping task. |
destination_type |
Yes |
String |
Storage type. Currently, only OBS is supported. |
obs_destination_descriptor |
Yes |
ObsDestinationDescriptor object |
Description of the dump. |
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
topics |
Yes |
String |
Names of the topics to be dumped. Multiple topics can be separated by commas (,). Regular expressions are supported. |
topics_regex |
No |
String |
Regular expression of the topics to be dumped. Set either the topics_regex parameter or the topics parameter. Do not set both of them. Do not set it to ".*". |
consumer_strategy |
Yes |
String |
Offset.
The default value is latest. |
destination_file_type |
Yes |
String |
Dump file format. Currently, only TXT files are supported. |
access_key |
Yes |
String |
Access key ID (AK). |
secret_key |
Yes |
String |
Secret access key (SK). |
obs_bucket_name |
Yes |
String |
Name of the OBS bucket used to store the data. |
obs_path |
No |
String |
OBS path. This parameter can be left blank by default. The value can contain a maximum of 64 characters, including letters, digits, underscores (_), hyphens (-), and slashes (/). This parameter is left empty by default. |
partition_format |
Yes |
String |
Directory structure of the object file written into OBS. The directory structure is in the format of yyyy/MM/dd/HH/mm (time at which the dump task was created).
NOTE:
After the data is dumped successfully, the storage directory structure is obs_bucket_path/file_prefix/partition_format. The default time zone is GMT+08:00. |
record_delimiter |
No |
String |
Delimiter for the dump file, which is used to separate the user data that is written into the dump file. Value range:
Default value: newline (\n). |
deliver_time_interval |
Yes |
Integer |
No package files will be generated if there is no data within a time segment. Value range: 30 to 900 Unit: second.
NOTE:
This parameter is mandatory if streaming data is dumped to OBS. |
Response Parameters
Status code: 200
Parameter |
Type |
Description |
---|---|---|
task_id |
String |
Task ID. |
Example Request
Creating a dumping task to dump data in topic-test to OBS
POST https://{endpoint}/v2/{project_id}/connectors/{connector_id}/sink-tasks { "source_type" : "BLOB", "task_name" : "obsTransfer-1122976956", "destination_type" : "OBS", "obs_destination_descriptor" : { "consumer_strategy" : "earliest", "destination_file_type" : "TEXT", "access_key" : "XXXXXXXXXXXXXXXXXXXX", "secret_key" : "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", "obs_bucket_name" : "6666", "obs_path" : "obsTransfer-1122976956", "partition_format" : "yyyy/MM/dd/HH/mm", "record_delimiter" : "", "deliver_time_interval" : 300, "topics" : "topic-test" } }
Example Response
Status code: 200
Successfully creating a dumping task
{ "task_id" : "2962882a-386c-4c9d-bb59-3b4f55d82961" }
SDK Sample Code
The SDK sample code is as follows.
Creating a dumping task to dump data in topic-test to OBS
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 |
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.kafka.v2.region.KafkaRegion; import com.huaweicloud.sdk.kafka.v2.*; import com.huaweicloud.sdk.kafka.v2.model.*; public class CreateSinkTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); ICredential auth = new BasicCredentials() .withAk(ak) .withSk(sk); KafkaClient client = KafkaClient.newBuilder() .withCredential(auth) .withRegion(KafkaRegion.valueOf("<YOUR REGION>")) .build(); CreateSinkTaskRequest request = new CreateSinkTaskRequest(); CreateSinkTaskReq body = new CreateSinkTaskReq(); ObsDestinationDescriptor obsDestinationDescriptorbody = new ObsDestinationDescriptor(); obsDestinationDescriptorbody.withTopics("topic-test") .withConsumerStrategy(ObsDestinationDescriptor.ConsumerStrategyEnum.fromValue("earliest")) .withDestinationFileType(ObsDestinationDescriptor.DestinationFileTypeEnum.fromValue("TEXT")) .withAccessKey("XXXXXXXXXXXXXXXXXXXX") .withSecretKey("XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX") .withObsBucketName("6666") .withObsPath("obsTransfer-1122976956") .withPartitionFormat(ObsDestinationDescriptor.PartitionFormatEnum.fromValue("yyyy/MM/dd/HH/mm")) .withRecordDelimiter("") .withDeliverTimeInterval(300); body.withObsDestinationDescriptor(obsDestinationDescriptorbody); body.withDestinationType(CreateSinkTaskReq.DestinationTypeEnum.fromValue("OBS")); body.withTaskName("obsTransfer-1122976956"); body.withSourceType(CreateSinkTaskReq.SourceTypeEnum.fromValue("BLOB")); request.withBody(body); try { CreateSinkTaskResponse response = client.createSinkTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } } |
Creating a dumping task to dump data in topic-test to OBS
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 |
# coding: utf-8 from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkkafka.v2.region.kafka_region import KafkaRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkkafka.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = __import__('os').getenv("CLOUD_SDK_AK") sk = __import__('os').getenv("CLOUD_SDK_SK") credentials = BasicCredentials(ak, sk) \ client = KafkaClient.new_builder() \ .with_credentials(credentials) \ .with_region(KafkaRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateSinkTaskRequest() obsDestinationDescriptorbody = ObsDestinationDescriptor( topics="topic-test", consumer_strategy="earliest", destination_file_type="TEXT", access_key="XXXXXXXXXXXXXXXXXXXX", secret_key="XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", obs_bucket_name="6666", obs_path="obsTransfer-1122976956", partition_format="yyyy/MM/dd/HH/mm", record_delimiter="", deliver_time_interval=300 ) request.body = CreateSinkTaskReq( obs_destination_descriptor=obsDestinationDescriptorbody, destination_type="OBS", task_name="obsTransfer-1122976956", source_type="BLOB" ) response = client.create_sink_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg) |
Creating a dumping task to dump data in topic-test to OBS
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 |
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" kafka "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/kafka/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). Build() client := kafka.NewKafkaClient( kafka.KafkaClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateSinkTaskRequest{} obsPathObsDestinationDescriptor:= "obsTransfer-1122976956" recordDelimiterObsDestinationDescriptor:= "" obsDestinationDescriptorbody := &model.ObsDestinationDescriptor{ Topics: "topic-test", ConsumerStrategy: model.GetObsDestinationDescriptorConsumerStrategyEnum().EARLIEST, DestinationFileType: model.GetObsDestinationDescriptorDestinationFileTypeEnum().TEXT, AccessKey: "XXXXXXXXXXXXXXXXXXXX", SecretKey: "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", ObsBucketName: "6666", ObsPath: &obsPathObsDestinationDescriptor, PartitionFormat: model.GetObsDestinationDescriptorPartitionFormatEnum().YYYY_MM_DD_HH_MM, RecordDelimiter: &recordDelimiterObsDestinationDescriptor, DeliverTimeInterval: int32(300), } request.Body = &model.CreateSinkTaskReq{ ObsDestinationDescriptor: obsDestinationDescriptorbody, DestinationType: model.GetCreateSinkTaskReqDestinationTypeEnum().OBS, TaskName: "obsTransfer-1122976956", SourceType: model.GetCreateSinkTaskReqSourceTypeEnum().BLOB, } response, err := client.CreateSinkTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } } |
Status Code
Status Code |
Description |
---|---|
200 |
A dumping task is created successfully. |
Error Codes
See Error Codes.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot