Adding OBS Dump Tasks
Function
This API is used to add OBS dump tasks.
Calling Method
For details, see Calling APIs.
URI
POST /v2/{project_id}/streams/{stream_name}/transfer-tasks
|
Parameter |
Mandatory |
Type |
Description |
|---|---|---|---|
|
project_id |
Yes |
String |
Project ID |
|
stream_name |
Yes |
String |
Name of the stream |
Request Parameters
|
Parameter |
Mandatory |
Type |
Description |
|---|---|---|---|
|
X-Auth-Token |
Yes |
String |
User token. It can be obtained by calling the IAM API used to obtain a user token. The value of X-Subject-Token in the response header is the user token. |
|
Parameter |
Mandatory |
Type |
Description |
|---|---|---|---|
|
destination_type |
No |
String |
Type of the dumping task.
|
|
obs_destination_descriptor |
No |
Parameter list of OBS to which data in the DIS stream will be dumped |
|
Parameter |
Mandatory |
Type |
Description |
|---|---|---|---|
|
task_name |
Yes |
String |
Definition Name of the dump task Constraints N/A Range It can contain 1 to 64 characters. Only letters, digits, hyphens (-), and underscores (_) are allowed. Default Value N/A |
|
agency_name |
No |
String |
Definition Name of the agency created in IAM. DIS uses an agency to access your specified resources. If agencies are available, you can use an IAM API to obtain the available agencies. If dump tasks are used on the console or through API calls, the system displays a message indicating that an agency named dis_admin_agency will be automatically created. Constraints Agency parameter settings:
Range Name of the agency to be created. The value of this parameter is 1 to 64 characters long and cannot be left unspecified. Default Value N/A |
|
deliver_time_interval |
Yes |
Integer |
Definition User-defined interval (s) at which data is imported into OBS. If no data is pushed during the current interval, no dump file package will be generated. Constraints N/A Range The value is an integer ranging from 30 to 900. Default Value N/A |
|
deliver_size_interval |
No |
Integer |
Definition Dump file size (MB). A dump is triggered when the size of the local cache reaches the threshold. Constraints N/A Range The value is an integer ranging from 1 to 300. Default Value N/A |
|
consumer_strategy |
No |
String |
Definition Offset Constraints When modifying a dump task, ensure that the offset is the same as that set when the dump task was created. For example, if the maximum offset was set during dump task creation, the maximum offset must be used during dump task modification. Range
Default Value N/A |
|
file_prefix |
No |
String |
Definition Custom OBS directory for storing stream files. It is empty by default. Constraints Different directory levels are separated by forward slashes (/). Range The value can contain a maximum of 50 characters, including letters, digits, underscores (_), and slashes (/). Default Value Empty |
|
partition_format |
No |
String |
Definition Directory structure of the object file written into OBS. The directory structure is in the format of yyyy/MM/dd/HH/mm (time at which the dumping task was created). After the data is dumped successfully, the storage directory structure is obs_bucket_path/file_prefix/partition_format. For example, if the dump task was created at 14:49 on November 10, 2017, then the directory structure is 2017 > 11 > 10 > 14 > 49. Constraints N/A Range
Default Value N/A |
|
obs_bucket_path |
Yes |
String |
Definition Name of the OBS bucket used to store the stream data. Constraints It consists of lowercase letters, digits, hyphens (-), underscores (), and periods (.). Range The value contains 3 to 63 characters, including lowercase letters, digits, hyphens (-), underscores (), and periods (.). Default Value N/A |
|
destination_file_type |
No |
String |
Definition Dump file format Constraints The csv and parquet formats can be selected only when Source Data Type is set to JSON and Dump Destination is set to OBS. Range
Default Value Text |
|
processing_schema |
No |
ProcessingSchema object |
Definition Dump time directory generated based on the timestamp of the source data and the configured partition_format. Directory structure of the object file written into OBS. The directory structure is in the format of yyyy/MM/dd/HH/mm. Constraints N/A Range N/A Default Value N/A |
|
record_delimiter |
No |
String |
Definition Delimiter for the dump file, which is used to separate the user data that is written into the dump file. Constraints It consists of lowercase letters, digits, hyphens (-), underscores (_), and periods (.). Range
Default Value "\n" |
|
Parameter |
Mandatory |
Type |
Description |
|---|---|---|---|
|
timestamp_name |
Yes |
String |
Definition Attribute name of the source data timestamp Constraints N/A Range N/A Default Value N/A |
|
timestamp_type |
Yes |
String |
Definition Type of the source data timestamp. Constraints N/A Range
Default Value N/A |
|
timestamp_format |
No |
String |
Definition OBS directory generated based on the timestamp format. This parameter is mandatory when the timestamp type of the source data is String. Constraints N/A Range N/A Default Value N/A |
Response Parameters
Status code: 201
Normal response
None
Example Requests
-
Adding OBS Dump Tasks
POST https://{Endpoint}/v2/{project_id}/streams/{stream_name}/transfer-tasks { "destination_type" : "OBS", "obs_destination_descriptor" : { "task_name" : "newtask", "consumer_strategy" : "LATEST", "agency_name" : "dis_admin_agency", "destination_file_type" : "text", "obs_bucket_path" : "obsbucket", "file_prefix" : "", "partition_format" : "yyyy/MM/dd/HH/mm", "record_delimiter" : "|", "deliver_time_interval" : 30 } } -
Adding OBS Dump Tasks (The dump file format is Parquet.)
POST https://{Endpoint}/v2/{project_id}/streams/{stream_name}/transfer-tasks { "destination_type" : "OBS", "obs_destination_descriptor" : { "task_name" : "newtask", "consumer_strategy" : "LATEST", "agency_name" : "dis_admin_agency", "destination_file_type" : "parquet", "obs_bucket_path" : "obsbucket", "file_prefix" : "", "partition_format" : "yyyy/MM/dd/HH/mm", "record_delimiter" : "|", "deliver_time_interval" : 30 } }
Example Responses
None
SDK Sample Code
The SDK sample code is as follows.
Java
-
Adding OBS Dump Tasks
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.dis.v2.region.DisRegion; import com.huaweicloud.sdk.dis.v2.*; import com.huaweicloud.sdk.dis.v2.model.*; public class CreateObsTransferTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); DisClient client = DisClient.newBuilder() .withCredential(auth) .withRegion(DisRegion.valueOf("<YOUR REGION>")) .build(); CreateObsTransferTaskRequest request = new CreateObsTransferTaskRequest(); request.withStreamName("{stream_name}"); CreateTransferTaskReq body = new CreateTransferTaskReq(); OBSDestinationDescriptorRequest obsDestinationDescriptorbody = new OBSDestinationDescriptorRequest(); obsDestinationDescriptorbody.withTaskName("newtask") .withObsBucketPath("obsbucket") .withAgencyName("dis_admin_agency") .withDestinationFileType(OBSDestinationDescriptorRequest.DestinationFileTypeEnum.fromValue("text")) .withRecordDelimiter("|") .withDeliverTimeInterval(30) .withFilePrefix("") .withPartitionFormat(OBSDestinationDescriptorRequest.PartitionFormatEnum.fromValue("yyyy/MM/dd/HH/mm")) .withConsumerStrategy(OBSDestinationDescriptorRequest.ConsumerStrategyEnum.fromValue("LATEST")); body.withObsDestinationDescriptor(obsDestinationDescriptorbody); body.withDestinationType(CreateTransferTaskReq.DestinationTypeEnum.fromValue("OBS")); request.withBody(body); try { CreateObsTransferTaskResponse response = client.createObsTransferTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
-
Adding OBS Dump Tasks (The dump file format is Parquet.)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62
package com.huaweicloud.sdk.test; import com.huaweicloud.sdk.core.auth.ICredential; import com.huaweicloud.sdk.core.auth.BasicCredentials; import com.huaweicloud.sdk.core.exception.ConnectionException; import com.huaweicloud.sdk.core.exception.RequestTimeoutException; import com.huaweicloud.sdk.core.exception.ServiceResponseException; import com.huaweicloud.sdk.dis.v2.region.DisRegion; import com.huaweicloud.sdk.dis.v2.*; import com.huaweicloud.sdk.dis.v2.model.*; public class CreateObsTransferTaskSolution { public static void main(String[] args) { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment String ak = System.getenv("CLOUD_SDK_AK"); String sk = System.getenv("CLOUD_SDK_SK"); String projectId = "{project_id}"; ICredential auth = new BasicCredentials() .withProjectId(projectId) .withAk(ak) .withSk(sk); DisClient client = DisClient.newBuilder() .withCredential(auth) .withRegion(DisRegion.valueOf("<YOUR REGION>")) .build(); CreateObsTransferTaskRequest request = new CreateObsTransferTaskRequest(); request.withStreamName("{stream_name}"); CreateTransferTaskReq body = new CreateTransferTaskReq(); OBSDestinationDescriptorRequest obsDestinationDescriptorbody = new OBSDestinationDescriptorRequest(); obsDestinationDescriptorbody.withTaskName("newtask") .withObsBucketPath("obsbucket") .withAgencyName("dis_admin_agency") .withDestinationFileType(OBSDestinationDescriptorRequest.DestinationFileTypeEnum.fromValue("parquet")) .withRecordDelimiter("|") .withDeliverTimeInterval(30) .withFilePrefix("") .withPartitionFormat(OBSDestinationDescriptorRequest.PartitionFormatEnum.fromValue("yyyy/MM/dd/HH/mm")) .withConsumerStrategy(OBSDestinationDescriptorRequest.ConsumerStrategyEnum.fromValue("LATEST")); body.withObsDestinationDescriptor(obsDestinationDescriptorbody); body.withDestinationType(CreateTransferTaskReq.DestinationTypeEnum.fromValue("OBS")); request.withBody(body); try { CreateObsTransferTaskResponse response = client.createObsTransferTask(request); System.out.println(response.toString()); } catch (ConnectionException e) { e.printStackTrace(); } catch (RequestTimeoutException e) { e.printStackTrace(); } catch (ServiceResponseException e) { e.printStackTrace(); System.out.println(e.getHttpStatusCode()); System.out.println(e.getRequestId()); System.out.println(e.getErrorCode()); System.out.println(e.getErrorMsg()); } } }
Python
-
Adding OBS Dump Tasks
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
# coding: utf-8 import os from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkdis.v2.region.dis_region import DisRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkdis.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) client = DisClient.new_builder() \ .with_credentials(credentials) \ .with_region(DisRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateObsTransferTaskRequest() request.stream_name = "{stream_name}" obsDestinationDescriptorbody = OBSDestinationDescriptorRequest( task_name="newtask", obs_bucket_path="obsbucket", agency_name="dis_admin_agency", destination_file_type="text", record_delimiter="|", deliver_time_interval=30, file_prefix="", partition_format="yyyy/MM/dd/HH/mm", consumer_strategy="LATEST" ) request.body = CreateTransferTaskReq( obs_destination_descriptor=obsDestinationDescriptorbody, destination_type="OBS" ) response = client.create_obs_transfer_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
-
Adding OBS Dump Tasks (The dump file format is Parquet.)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47
# coding: utf-8 import os from huaweicloudsdkcore.auth.credentials import BasicCredentials from huaweicloudsdkdis.v2.region.dis_region import DisRegion from huaweicloudsdkcore.exceptions import exceptions from huaweicloudsdkdis.v2 import * if __name__ == "__main__": # The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. # In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak = os.environ["CLOUD_SDK_AK"] sk = os.environ["CLOUD_SDK_SK"] projectId = "{project_id}" credentials = BasicCredentials(ak, sk, projectId) client = DisClient.new_builder() \ .with_credentials(credentials) \ .with_region(DisRegion.value_of("<YOUR REGION>")) \ .build() try: request = CreateObsTransferTaskRequest() request.stream_name = "{stream_name}" obsDestinationDescriptorbody = OBSDestinationDescriptorRequest( task_name="newtask", obs_bucket_path="obsbucket", agency_name="dis_admin_agency", destination_file_type="parquet", record_delimiter="|", deliver_time_interval=30, file_prefix="", partition_format="yyyy/MM/dd/HH/mm", consumer_strategy="LATEST" ) request.body = CreateTransferTaskReq( obs_destination_descriptor=obsDestinationDescriptorbody, destination_type="OBS" ) response = client.create_obs_transfer_task(request) print(response) except exceptions.ClientRequestException as e: print(e.status_code) print(e.request_id) print(e.error_code) print(e.error_msg)
Go
-
Adding OBS Dump Tasks
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" dis "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/dis/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/dis/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/dis/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := dis.NewDisClient( dis.DisClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateObsTransferTaskRequest{} request.StreamName = "{stream_name}" destinationFileTypeObsDestinationDescriptor:= model.GetObsDestinationDescriptorRequestDestinationFileTypeEnum().TEXT recordDelimiterObsDestinationDescriptor:= "|" filePrefixObsDestinationDescriptor:= "" partitionFormatObsDestinationDescriptor:= model.GetObsDestinationDescriptorRequestPartitionFormatEnum().YYYY_MM_DD_HH_MM consumerStrategyObsDestinationDescriptor:= model.GetObsDestinationDescriptorRequestConsumerStrategyEnum().LATEST obsDestinationDescriptorbody := &model.ObsDestinationDescriptorRequest{ TaskName: "newtask", ObsBucketPath: "obsbucket", AgencyName: "dis_admin_agency", DestinationFileType: &destinationFileTypeObsDestinationDescriptor, RecordDelimiter: &recordDelimiterObsDestinationDescriptor, DeliverTimeInterval: int32(30), FilePrefix: &filePrefixObsDestinationDescriptor, PartitionFormat: &partitionFormatObsDestinationDescriptor, ConsumerStrategy: &consumerStrategyObsDestinationDescriptor, } request.Body = &model.CreateTransferTaskReq{ ObsDestinationDescriptor: obsDestinationDescriptorbody, DestinationType: model.GetCreateTransferTaskReqDestinationTypeEnum().OBS, } response, err := client.CreateObsTransferTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
-
Adding OBS Dump Tasks (The dump file format is Parquet.)
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
package main import ( "fmt" "github.com/huaweicloud/huaweicloud-sdk-go-v3/core/auth/basic" dis "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/dis/v2" "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/dis/v2/model" region "github.com/huaweicloud/huaweicloud-sdk-go-v3/services/dis/v2/region" ) func main() { // The AK and SK used for authentication are hard-coded or stored in plaintext, which has great security risks. It is recommended that the AK and SK be stored in ciphertext in configuration files or environment variables and decrypted during use to ensure security. // In this example, AK and SK are stored in environment variables for authentication. Before running this example, set environment variables CLOUD_SDK_AK and CLOUD_SDK_SK in the local environment ak := os.Getenv("CLOUD_SDK_AK") sk := os.Getenv("CLOUD_SDK_SK") projectId := "{project_id}" auth := basic.NewCredentialsBuilder(). WithAk(ak). WithSk(sk). WithProjectId(projectId). Build() client := dis.NewDisClient( dis.DisClientBuilder(). WithRegion(region.ValueOf("<YOUR REGION>")). WithCredential(auth). Build()) request := &model.CreateObsTransferTaskRequest{} request.StreamName = "{stream_name}" destinationFileTypeObsDestinationDescriptor:= model.GetObsDestinationDescriptorRequestDestinationFileTypeEnum().PARQUET recordDelimiterObsDestinationDescriptor:= "|" filePrefixObsDestinationDescriptor:= "" partitionFormatObsDestinationDescriptor:= model.GetObsDestinationDescriptorRequestPartitionFormatEnum().YYYY_MM_DD_HH_MM consumerStrategyObsDestinationDescriptor:= model.GetObsDestinationDescriptorRequestConsumerStrategyEnum().LATEST obsDestinationDescriptorbody := &model.ObsDestinationDescriptorRequest{ TaskName: "newtask", ObsBucketPath: "obsbucket", AgencyName: "dis_admin_agency", DestinationFileType: &destinationFileTypeObsDestinationDescriptor, RecordDelimiter: &recordDelimiterObsDestinationDescriptor, DeliverTimeInterval: int32(30), FilePrefix: &filePrefixObsDestinationDescriptor, PartitionFormat: &partitionFormatObsDestinationDescriptor, ConsumerStrategy: &consumerStrategyObsDestinationDescriptor, } request.Body = &model.CreateTransferTaskReq{ ObsDestinationDescriptor: obsDestinationDescriptorbody, DestinationType: model.GetCreateTransferTaskReqDestinationTypeEnum().OBS, } response, err := client.CreateObsTransferTask(request) if err == nil { fmt.Printf("%+v\n", response) } else { fmt.Println(err) } }
More
For SDK sample code of more programming languages, see the Sample Code tab in API Explorer. SDK sample code can be automatically generated.
Status Codes
|
Status Code |
Description |
|---|---|
|
201 |
Normal response |
Error Codes
See Error Codes.
Feedback
Was this page helpful?
Provide feedbackThank you very much for your feedback. We will continue working to improve the documentation.See the reply and handling status in My Cloud VOC.
For any further questions, feel free to contact us through the chatbot.
Chatbot