更新时间:2024-10-25 GMT+08:00

添加转储任务

参考初始化DIS客户端的操作初始化一个DIS客户端实例,实例名称为dic。

使用DIS SDK创建转储任务,需要指定通道名称、转储任务名称,转储周期,转储目标服务等信息。

添加转储到对象存储服务(OBS)的转储任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
CreateTransferTaskRequest request = new CreateTransferTaskRequest();

// 配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);

// 添加OBS转储任务,并设置任务名称
OBSDestinationDescriptorRequest descriptor = new OBSDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);

// 转储至对象存储服务(简称OBS):OBS桶名和子文件夹名,通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBucketPath("obs-dis");
descriptor.setFilePrefix("transfertask");

// 转储周期,单位s
descriptor.setDeliverTimeInterval(900);

// 可选:在DIS管理页面自动创建名称为“dis_admin_agency”的IAM委托,默认采用此委托,用于授权访问。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");

// 可选,转储OBS的目标文件格式:默认text,可配置parquet、carbon
descriptor.setDestinationFileType(DestinationFileTypeEnum.TEXT.getType());

// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());

request.setObsDestinationDescriptor(descriptor);

配置“CreateTransferTaskRequest”对象之后,通过调用createTransferTask的方法创建转储任务。

1
dic.createTransferTask(request);

添加转储到MapReduce服务(MRS)的转储任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
CreateTransferTaskRequest request = new CreateTransferTaskRequest();

//配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);

//添加MRS转储任务,并设置任务名称
MRSDestinationDescriptorRequest descriptor = new MRSDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);

// 配置MRS集群信息:集群名称和集群ID。可通过弹性大数据服务(简称MRS)控制台创建和查询,集群需为非安全模式
descriptor.setMrsClusterName("mrs_dis");
descriptor.setMrsClusterId("fe69a732-c7d3-4b0f-8cda-ec9eca0cf141");

// 转储MRS通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBucketPath("obs-dis");
descriptor.setFilePrefix("transfertask");

// 转储周期,单位s
descriptor.setDeliverTimeInterval(900);

// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");

// 转储OBS的目标文件格式:默认text,可配置parquet、carbon
descriptor.setDestinationFileType(DestinationFileTypeEnum.TEXT.getType());

// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());

request.setMrsDestinationDescriptor(descriptor);

配置“CreateTransferTaskRequest”对象之后,通过调用createTransferTask的方法创建转储任务。

1
dic.createTransferTask(request);

添加转储到数据湖探索服务(DLI)的转储任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
CreateTransferTaskRequest request = new CreateTransferTaskRequest();

// 配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);

// 添加DLI转储任务,并设置任务名称
UqueryDestinationDescriptorRequest descriptor = new UqueryDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);

// 配置DLI相关信息:数据库和内表名称。可通过数据湖探索(简称DLI)控制台创建和查询,DLI表需为内表
descriptor.setDliDatabaseName("dis_dli");
descriptor.setDliTableName("dis_test");

// 转储DLI通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBucketPath("obs-dis");
descriptor.setFilePrefix("transfertask");

// 转储周期,单位s
descriptor.setDeliverTimeInterval(900);

// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");

// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());

request.setDliDestinationDescriptor(descriptor);

配置“CreateTransferTaskRequest”对象之后,通过调用createTransferTask的方法创建转储任务。

1
dic.createTransferTask(request);

添加转储到数据仓库服务(DWS)的转储任务

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
CreateTransferTaskRequest request = new CreateTransferTaskRequest();

//配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);

//添加DWS转储任务,并设置任务名称
DwsDestinationDescriptorRequest descriptor = new DwsDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);

// 配置DWS集群信息:集群名称、集群ID、数据库等信息。可通过数据仓库服务(简称DWS)控制台创建和查询集群,并通过客户端或其他方式创建数据表
descriptor.setDwsClusterName("dis_test");
descriptor.setDwsClusterId("92f90f6a-de4d-4689-82f6-320c328b0062");
descriptor.setDwsDatabaseName("postgres");
descriptor.setDwsSchema("dbadmin");
descriptor.setDwsTableName("distable01");
descriptor.setDwsDelimiter("|");
descriptor.setUserName(System.getenv("DB_ADMIN"));
descriptor.setUserPassword(System.getenv("DB_PASSWORD"));

//DIS调用KMS服务加密存储DWS的密码,保证用户数据安全:用户可通过数据加密服务(简称KMS)控制台的"密钥管理"创建和查询KMS密钥信息
descriptor.setKmsUserKeyName("qiyinshan");
descriptor.setKmsUserKeyId("9521c600-64a8-4971-ad36-7bbfa6d00c41");

// 转储DWS通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBucketPath("obs-dis");
descriptor.setFilePrefix("transfertask");

// 转储周期,单位s
descriptor.setDeliverTimeInterval(900);

// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");

// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());

request.setDwsDestinationDescriptor(descriptor);

配置“CreateTransferTaskRequest”对象之后,通过对客户端调用createTransferTask的方法创建转储任务。

1
dic.createTransferTask(request);