Adding OBS Dump Tasks
Function
This API is used to add OBS dump tasks.
Calling Method
For details, see Calling APIs.
URI
POST /v2/{project_id}/streams/{stream_name}/transfer-tasks
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
project_id |
Yes |
String |
Project ID |
stream_name |
Yes |
String |
Name of the stream Maximum: 60 |
Request Parameters
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
X-Auth-Token |
Yes |
String |
User token. It can be obtained by calling the IAM API used to obtain a user token. The value of X-Subject-Token in the response header is the user token. |
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
destination_type |
Yes |
String |
Type of the dump task.
Default: NOWHERE Enumeration values:
|
obs_destination_descriptor |
No |
Parameter list of OBS to which data in the DIS stream will be dumped |
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
task_name |
Yes |
String |
Name of the dump task. The task name consists of letters, digits, hyphens (-), and underscores (_). It contains 1 to 64 characters. |
agency_name |
Yes |
String |
Name of the agency created on IAM. DIS uses an agency to access your specified resources. Agency parameter settings:- Agency Type: Cloud service- Cloud Service: DIS- Validity Period: Unlimited- Set Policy to Tenant Administrator on the OBS project in the Global service region.If agencies are available, you can use an IAM API to obtain the available agencies.This parameter cannot be left unspecified and the parameter value cannot exceed 64 characters.If there are dump tasks on the console, the system displays a message indicating that an agency will be automatically created. The name of the automatically created agency is dis_admin_agency. Maximum: 64 |
deliver_time_interval |
Yes |
Integer |
User-defined interval at which data is imported from the current DIS stream into OBS. If no data is pushed to the DIS stream during the current interval, no dump file package will be generated. Unit: second Minimum: 30 Maximum: 900 Default: 300 |
consumer_strategy |
No |
String |
Offset.
Default: LATEST Enumeration values:
|
file_prefix |
No |
String |
Directory to store files that will be dumped to OBS. Different directory levels are separated by slashes (/) and cannot start with slashes. The value can contain a maximum of 50 characters, including letters, digits, underscores (_), and slashes (/). This parameter is left blank by default. Maximum: 50 |
partition_format |
No |
String |
Directory structure of the object file written into OBS. The directory structure is in the format of yyyy/MM/dd/HH/mm (time at which the dump task was created).- N/A: If this parameter is left blank, the time directory format will not be used.- yyyy: year.- yyyy/MM: year and month.- yyyy/MM/dd: year, month, and day.- yyyy/MM/dd/HH: year, month, day, and hour.- yyyy/MM/dd/HH/mm: year, month, day, hour, and minute.For example, if the dump task was created at 14:49 on November 10, 2017, then the directory structure is 2017 > 11 > 10 > 14 > 49.Default value: emptyNote:After the data is dumped successfully, the storage directory structure is obs_bucket_path/file_prefix/partition_format. Enumeration values:
|
obs_bucket_path |
Yes |
String |
Name of the OBS bucket used to store the stream data |
destination_file_type |
No |
String |
Dump file format.
Note: The parquet or carbon format can be selected only when Source Data Type is set to JSON and Dump Destination is set to OBS. Default: text Enumeration values:
|
processing_schema |
No |
ProcessingSchema object |
Dump time directory generated based on the timestamp of the source data and the configured partition_format. Directory structure of the object file written into OBS. The directory structure is in the format of yyyy/MM/dd/HH/mm. |
record_delimiter |
No |
String |
Delimiter for the dump file, which is used to separate the user data that is written into the dump file. Options:
Default: \n |
Parameter |
Mandatory |
Type |
Description |
---|---|---|---|
timestamp_name |
Yes |
String |
Attribute name of the source data timestamp |
timestamp_type |
Yes |
String |
Type of the source data timestamp.
|
timestamp_format |
No |
String |
OBS directory generated based on the timestamp format. This parameter is mandatory when the timestamp type of the source data is String. Enumeration values:
|
Response Parameters
None
Example Requests
-
Adding OBS Dump Tasks
POST https://{Endpoint}/v2/{project_id}/streams/{stream_name}/transfer-tasks { "destination_type" : "OBS", "obs_destination_descriptor" : { "task_name" : "newtask", "consumer_strategy" : "LATEST", "agency_name" : "dis_admin_agency", "destination_file_type" : "text", "obs_bucket_path" : "obsbucket", "file_prefix" : "", "partition_format" : "yyyy/MM/dd/HH/mm", "record_delimiter" : "|", "deliver_time_interval" : 30 } }
-
Adding OBS Dump Tasks (The dump file format is Parquet.)
POST https://{Endpoint}/v2/{project_id}/streams/{stream_name}/transfer-tasks { "destination_type" : "OBS", "obs_destination_descriptor" : { "task_name" : "newtask", "consumer_strategy" : "LATEST", "agency_name" : "dis_admin_agency", "destination_file_type" : "parquet", "obs_bucket_path" : "obsbucket", "file_prefix" : "", "partition_format" : "yyyy/MM/dd/HH/mm", "record_delimiter" : "|", "deliver_time_interval" : 30 } }
Example Responses
None
SDK Sample Code
The SDK sample code is as follows.
Java
-
Adding OBS Dump Tasks
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.dis.v2.region.DisRegion; import com.huaweicloud.sdk.dis.v2.*; import com.huaweicloud.sdk.dis.v2.model.*; public class CreateObsTransferTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); ICredential auth = new BasicCredentials() .withAk(ak) .withSk(sk); DisClient client = DisClient.newBuilder() .withCredential(auth) .withRegion(DisRegion.valueOf("<YOUR REGION>")) .build(); CreateObsTransferTaskRequest request = new CreateObsTransferTaskRequest(); CreateTransferTaskReq body = new CreateTransferTaskReq(); OBSDestinationDescriptorRequest obsDestinationDescriptorbody = new OBSDestinationDescriptorRequest(); obsDestinationDescriptorbody.withTaskName("newtask") .withObsBucketPath("obsbucket") .withAgencyName("dis_admin_agency") .withDestinationFileType(OBSDestinationDescriptorRequest.DestinationFileTypeEnum.fromValue("text")) .withRecordDelimiter("|") .withDeliverTimeInterval(30) .withFilePrefix("") .withPartitionFormat(OBSDestinationDescriptorRequest.PartitionFormatEnum.fromValue("yyyy/MM/dd/HH/mm")) .withConsumerStrategy(OBSDestinationDescriptorRequest.ConsumerStrategyEnum.fromValue("LATEST")); body.withObsDestinationDescriptor(obsDestinationDescriptorbody); body.withDestinationType(CreateTransferTaskReq.DestinationTypeEnum.fromValue("OBS")); request.withBody(body); try { CreateObsTransferTaskResponse response = client.createObsTransferTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
-
Adding OBS Dump Tasks (The dump file format is Parquet.)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.dis.v2.region.DisRegion; import com.huaweicloud.sdk.dis.v2.*; import com.huaweicloud.sdk.dis.v2.model.*; public class CreateObsTransferTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); ICredential auth = new BasicCredentials() .withAk(ak) .withSk(sk); DisClient client = DisClient.newBuilder() .withCredential(auth) .withRegion(DisRegion.valueOf("<YOUR REGION>")) .build(); CreateObsTransferTaskRequest request = new CreateObsTransferTaskRequest(); CreateTransferTaskReq body = new CreateTransferTaskReq(); OBSDestinationDescriptorRequest obsDestinationDescriptorbody = new OBSDestinationDescriptorRequest(); obsDestinationDescriptorbody.withTaskName("newtask") .withObsBucketPath("obsbucket") .withAgencyName("dis_admin_agency") .withDestinationFileType(OBSDestinationDescriptorRequest.DestinationFileTypeEnum.fromValue("parquet")) .withRecordDelimiter("|") .withDeliverTimeInterval(30) .withFilePrefix("") .withPartitionFormat(OBSDestinationDescriptorRequest.PartitionFormatEnum.fromValue("yyyy/MM/dd/HH/mm")) .withConsumerStrategy(OBSDestinationDescriptorRequest.ConsumerStrategyEnum.fromValue("LATEST")); body.withObsDestinationDescriptor(obsDestinationDescriptorbody); body.withDestinationType(CreateTransferTaskReq.DestinationTypeEnum.fromValue("OBS")); request.withBody(body); try { CreateObsTransferTaskResponse response = client.createObsTransferTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
Python
-
Adding OBS Dump Tasks
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
# coding: utf-8 import os from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkdis.v2.region.dis_region import DisRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkdis.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] credentials = BasicCredentials(ak, sk) client = DisClient.new_builder() \ .with_credentials(credentials) \ .with_region(DisRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateObsTransferTaskRequest() obsDestinationDescriptorbody = OBSDestinationDescriptorRequest( task_name="newtask", obs_bucket_path="obsbucket", agency_name="dis_admin_agency", destination_file_type="text", record_delimiter="|", deliver_time_interval=30, file_prefix="", partition_format="yyyy/MM/dd/HH/mm", consumer_strategy="LATEST" ) request.body = CreateTransferTaskReq( obs_destination_descriptor=obsDestinationDescriptorbody, destination_type="OBS" ) response = client.create_obs_transfer_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
-
Adding OBS Dump Tasks (The dump file format is Parquet.)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
# coding: utf-8 import os from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkdis.v2.region.dis_region import DisRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkdis.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] credentials = BasicCredentials(ak, sk) client = DisClient.new_builder() \ .with_credentials(credentials) \ .with_region(DisRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateObsTransferTaskRequest() obsDestinationDescriptorbody = OBSDestinationDescriptorRequest( task_name="newtask", obs_bucket_path="obsbucket", agency_name="dis_admin_agency", destination_file_type="parquet", record_delimiter="|", deliver_time_interval=30, file_prefix="", partition_format="yyyy/MM/dd/HH/mm", consumer_strategy="LATEST" ) request.body = CreateTransferTaskReq( obs_destination_descriptor=obsDestinationDescriptorbody, destination_type="OBS" ) response = client.create_obs_transfer_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
Go
-
Adding OBS Dump Tasks
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" dis "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/dis/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/dis/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/dis/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). Build() client := dis.NewDisClient( dis.DisClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateObsTransferTaskRequest{} destinationFileTypeObsDestinationDescriptor:= model.GetObsDestinationDescriptorRequestDestinationFileTypeEnum().TEXT recordDelimiterObsDestinationDescriptor:= "|" filePrefixObsDestinationDescriptor:= "" partitionFormatObsDestinationDescriptor:= model.GetObsDestinationDescriptorRequestPartitionFormatEnum().YYYY_MM_DD_HH_MM consumerStrategyObsDestinationDescriptor:= model.GetObsDestinationDescriptorRequestConsumerStrategyEnum().LATEST obsDestinationDescriptorbody := &model.ObsDestinationDescriptorRequest{ TaskName: "newtask", ObsBucketPath: "obsbucket", AgencyName: "dis_admin_agency", DestinationFileType: &destinationFileTypeObsDestinationDescriptor, RecordDelimiter: &recordDelimiterObsDestinationDescriptor, DeliverTimeInterval: int32(30), FilePrefix: &filePrefixObsDestinationDescriptor, PartitionFormat: &partitionFormatObsDestinationDescriptor, ConsumerStrategy: &consumerStrategyObsDestinationDescriptor, } request.Body = &model.CreateTransferTaskReq{ ObsDestinationDescriptor: obsDestinationDescriptorbody, DestinationType: model.GetCreateTransferTaskReqDestinationTypeEnum().OBS, } response, err := client.CreateObsTransferTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
-
Adding OBS Dump Tasks (The dump file format is Parquet.)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" dis "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/dis/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/dis/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/dis/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). Build() client := dis.NewDisClient( dis.DisClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateObsTransferTaskRequest{} destinationFileTypeObsDestinationDescriptor:= model.GetObsDestinationDescriptorRequestDestinationFileTypeEnum().PARQUET recordDelimiterObsDestinationDescriptor:= "|" filePrefixObsDestinationDescriptor:= "" partitionFormatObsDestinationDescriptor:= model.GetObsDestinationDescriptorRequestPartitionFormatEnum().YYYY_MM_DD_HH_MM consumerStrategyObsDestinationDescriptor:= model.GetObsDestinationDescriptorRequestConsumerStrategyEnum().LATEST obsDestinationDescriptorbody := &model.ObsDestinationDescriptorRequest{ TaskName: "newtask", ObsBucketPath: "obsbucket", AgencyName: "dis_admin_agency", DestinationFileType: &destinationFileTypeObsDestinationDescriptor, RecordDelimiter: &recordDelimiterObsDestinationDescriptor, DeliverTimeInterval: int32(30), FilePrefix: &filePrefixObsDestinationDescriptor, PartitionFormat: &partitionFormatObsDestinationDescriptor, ConsumerStrategy: &consumerStrategyObsDestinationDescriptor, } request.Body = &model.CreateTransferTaskReq{ ObsDestinationDescriptor: obsDestinationDescriptorbody, DestinationType: model.GetCreateTransferTaskReqDestinationTypeEnum().OBS, } response, err := client.CreateObsTransferTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
More
For SDK sample code of more programming languages, see the Sample Code tab in API Explorer. SDK sample code can be automatically generated.
Status Codes
Status Code |
Description |
---|---|
201 |
Normal response |
Error Codes
See Error Codes.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot