文档首页 > > SDK参考> 使用SDK> 使用SDK(Java)> 添加转储任务

添加转储任务

分享
更新时间: 2019/08/27 GMT+08:00

参考初始化DIS客户端的操作初始化一个DIS客户端实例,实例名称为dic。

使用DIS SDK创建转储任务,需要指定通道名称、转储任务名称,转储周期,转储目标服务等信息。示例代码为“dis-sdk-demo\src\main\java\com\bigdata\dis\sdk\demo\example”目录下的“TransferTaskDemo.java”文件。

添加转储到对象存储服务(OBS)的转储任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CreateTransferTaskRequest request = new CreateTransferTaskRequest();

// 配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);

// 添加OBS转储任务,并设置任务名称
OBSDestinationDescriptorRequest descriptor = new OBSDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);

// 转储至对象存储服务(简称OBS):OBS桶名和子文件夹名,通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBucketPath("obs-dis");
descriptor.setFilePrefix("transfertask");

// 转储周期,单位s
descriptor.setDeliverTimeInterval(900);

// 可选:在DIS管理页面自动创建名称为“dis_admin_agency”的IAM委托,默认采用此委托,用于授权访问。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");

// 可选,转储OBS的目标文件格式:默认text,可配置parquet、carbon
descriptor.setDestinationFileType(DestinationFileTypeEnum.TEXT.getType());

// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());

request.setObsDestinationDescriptor(descriptor);

配置“CreateTransferTaskRequest”对象之后,通过调用createTransferTask的方法创建转储任务。

1
dic.createTransferTask(request);

添加转储到MapReduce服务(MRS)的转储任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
CreateTransferTaskRequest request = new CreateTransferTaskRequest();

//配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);

//添加MRS转储任务,并设置任务名称
MRSDestinationDescriptorRequest descriptor = new MRSDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);

// 配置MRS集群信息:集群名称和集群ID。可通过弹性大数据服务(简称MRS)控制台创建和查询,集群需为非安全模式
descriptor.setMrsClusterName("mrs_dis");
descriptor.setMrsClusterId("fe69a732-c7d3-4b0f-8cda-ec9eca0cf141");

// 转储MRS通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBucketPath("obs-dis");
descriptor.setFilePrefix("transfertask");

// 转储周期,单位s
descriptor.setDeliverTimeInterval(900);

// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");

// 转储OBS的目标文件格式:默认text,可配置parquet、carbon
descriptor.setDestinationFileType(DestinationFileTypeEnum.TEXT.getType());

// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());

request.setMrsDestinationDescriptor(descriptor);

配置“CreateTransferTaskRequest”对象之后,通过调用createTransferTask的方法创建转储任务。

1
dic.createTransferTask(request);

添加转储到数据湖探索服务(DLI)的转储任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
CreateTransferTaskRequest request = new CreateTransferTaskRequest();

// 配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);

// 添加DLI转储任务,并设置任务名称
UqueryDestinationDescriptorRequest descriptor = new UqueryDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);

// 配置DLI相关信息:数据库和内表名称。可通过数据湖探索(简称DLI)控制台创建和查询,DLI表需为内表
descriptor.setDliDatabaseName("dis_dli");
descriptor.setDliTableName("dis_test");

// 转储DLI通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBucketPath("obs-dis");
descriptor.setFilePrefix("transfertask");

// 转储周期,单位s
descriptor.setDeliverTimeInterval(900);

// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");

// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());

request.setDliDestinationDescriptor(descriptor);

配置“CreateTransferTaskRequest”对象之后,通过调用createTransferTask的方法创建转储任务。

1
dic.createTransferTask(request);

添加转储到数据仓库服务(DWS)的转储任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
CreateTransferTaskRequest request = new CreateTransferTaskRequest();

//配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);

//添加DWS转储任务,并设置任务名称
DwsDestinationDescriptorRequest descriptor = new DwsDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);

// 配置DWS集群信息:集群名称、集群ID、数据库等信息。可通过数据仓库服务(简称DWS)控制台创建和查询集群,并通过客户端或其他方式创建数据表
descriptor.setDwsClusterName("dis_test");
descriptor.setDwsClusterId("92f90f6a-de4d-4689-82f6-320c328b0062");
descriptor.setDwsDatabaseName("postgres");
descriptor.setDwsSchema("dbadmin");
descriptor.setDwsTableName("distable01");
descriptor.setDwsDelimiter("|");
descriptor.setUserName("dbadmin");
descriptor.setUserPassword("xxxx");

//DIS调用KMS服务加密存储DWS的密码,保证用户数据安全:用户可通过数据加密服务(简称KMS)控制台的"秘钥管理"创建和查询KMS秘钥信息
descriptor.setKmsUserKeyName("qiyinshan");
descriptor.setKmsUserKeyId("9521c600-64a8-4971-ad36-7bbfa6d00c41");

// 转储DWS通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBucketPath("obs-dis");
descriptor.setFilePrefix("transfertask");

// 转储周期,单位s
descriptor.setDeliverTimeInterval(900);

// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");

// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());

request.setDwsDestinationDescriptor(descriptor);

配置“CreateTransferTaskRequest”对象之后,通过对客户端调用createTransferTask的方法创建转储任务。

1
dic.createTransferTask(request);

添加转储到表格存储服务(CloudTable) HBase的转储任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
CreateTransferTaskRequest request = new CreateTransferTaskRequest();

//配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);

//添加CloudTable转储任务,并设置任务名称
CloudtableDestinationDescriptorRequest descriptor = new CloudtableDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);

// 配置CloudTable集群信息:集群名称、集群ID、数据库等信息。可通过表格存储服务(简称CloudTable)控制台创建和查询集群,并通过客户端或其他方式创建数据表
descriptor.setCloudtableClusterName("dis_test");
descriptor.setCloudtableClusterId("92f90f6a-de4d-4689-82f6-320c328b0062");
descriptor.setCloudtableTableName("dis");

//CloudtableSchema的配置方法参考“用户指南”
CloudtableSchema cloudtableSchema = new CloudtableSchema();
List<SchemaField> rowKeySchema = new ArrayList<>();
SchemaField field1 = new SchemaField();
field1.setValue("id");
field1.setType("String");
rowKeySchema.add(field1);
SchemaField rField1 = new SchemaField();
rField1.setValue("group.users.id");
rField1.setType("String");
rowKeySchema.add(rField1);
List<SchemaField> columnsSchema = new ArrayList<>();
SchemaField field2 = new SchemaField();
field2.setColumnFamilyName("user");
field2.setQualifierName("id");
field2.setValue("group.users.id");
field2.setType("String");
SchemaField field3 = new SchemaField();
field3.setColumnFamilyName("user");
field3.setQualifierName("age");
field3.setValue("group.users.age");
field3.setType("Int");
columnsSchema.add(field2);
columnsSchema.add(field3);
cloudtableSchema.setRowKeySchema(rowKeySchema);
cloudtableSchema.setColumnsSchema(columnsSchema);
descriptor.setCloudtableSchema(cloudtableSchema);

// 配置OBS桶名和子文件夹名,用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBackupBucketPath("obs-dis");
descriptor.setBackupfilePrefix("transfertask");

// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");

// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());

request.setCloudtableDestinationDescriptor(descriptor);

配置“CreateTransferTaskRequest”对象之后,通过调用createTransferTask的方法创建转储任务。

1
dic.createTransferTask(request);

添加转储到表格存储服务(CloudTable) openTSDB的转储任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
CreateTransferTaskRequest request = new CreateTransferTaskRequest();

//配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);

//添加转储至CloudTable openTSDB的任务,并设置任务名称
CloudtableDestinationDescriptorRequest descriptor = new CloudtableDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);

// 配置CloudTable集群信息:集群名称、集群ID、数据库等信息。可通过表格存储服务(简称CloudTable)控制台创建和查询集群,并通过客户端或其他方式创建数据表
descriptor.setCloudtableClusterName("dlf_test");
descriptor.setCloudtableClusterId("92f90f6a-de4d-4689-82f6-320c328b0062");

//OpenTSDBSchema的配置方法参考“用户指南”
List<SchemaField> metricSchema = new ArrayList<>();
SchemaField field1 = new SchemaField();
field1.setValue("group.users.id");
field1.setType("String");
metricSchema.add(field1);
SchemaField timestampSchema = new SchemaField();
timestampSchema.setColumnFamilyName("user");
timestampSchema.setFormat("yyyy/MM/dd HH:mm:ss");
timestampSchema.setValue("group.users.birthday");
timestampSchema.setType("String");
SchemaField valueSchema = new SchemaField();
valueSchema.setValue("group.users.age");
valueSchema.setType("Int");
List<SchemaField> tagsSchema = new ArrayList<>();
SchemaField field2 = new SchemaField();
field2.setName("group.users.id");
field2.setValue("group.users.id");
field2.setType("String");
SchemaField field3 = new SchemaField();
field3.setName("age");
field3.setValue("group.users.age");
field3.setType("Int");
tagsSchema.add(field2);
tagsSchema.add(field3);
OpenTSDBSchema openTSDBSchema = new OpenTSDBSchema();
openTSDBSchema.setMetricSchema(metricSchema);
openTSDBSchema.setTimestampSchema(timestampSchema);
openTSDBSchema.setValueSchema(valueSchema);
openTSDBSchema.setTagsSchema(tagsSchema);
List<OpenTSDBSchema> openTSDBSchemaList = new ArrayList<>();
openTSDBSchemaList.add(openTSDBSchema);
descriptor.setOpentsdbSchema(openTSDBSchemaList);

// 配置OBS桶名和子文件夹名,用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBackupBucketPath("obs-dis");
descriptor.setBackupfilePrefix("transfertask");

// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");

// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());

request.setCloudtableDestinationDescriptor(descriptor);

配置“CreateTransferTaskRequest”对象之后,通过对客户端调用createTransferTask的方法创建转储任务。

1
dic.createTransferTask(request);
分享:

    相关文档

    相关产品

文档是否有解决您的问题?

提交成功!

非常感谢您的反馈,我们会继续努力做到更好!

反馈提交失败,请稍后再试!

*必选

请至少选择或填写一项反馈信息

字符长度不能超过200

提交反馈 取消

如您有其它疑问,您也可以通过华为云社区问答频道来与我们联系探讨

智能客服提问云社区提问