参考初始化DIS客户端的操作初始化一个DIS客户端实例,实例名称为dic。
使用DIS SDK创建转储任务,需要指定通道名称、转储任务名称,转储周期,转储目标服务等信息。
添加转储到对象存储服务(OBS)的转储任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
CreateTransferTaskRequest request = new CreateTransferTaskRequest();
// 配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);
// 添加OBS转储任务,并设置任务名称
OBSDestinationDescriptorRequest descriptor = new OBSDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);
// 转储至对象存储服务(简称OBS):OBS桶名和子文件夹名,通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBucketPath("obs-dis");
descriptor.setFilePrefix("transfertask");
// 转储周期,单位s
descriptor.setDeliverTimeInterval(900);
// 可选:在DIS管理页面自动创建名称为“dis_admin_agency”的IAM委托,默认采用此委托,用于授权访问。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");
// 可选,转储OBS的目标文件格式:默认text,可配置parquet、carbon
descriptor.setDestinationFileType(DestinationFileTypeEnum.TEXT.getType());
// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());
request.setObsDestinationDescriptor(descriptor);
|
配置“CreateTransferTaskRequest”对象之后,通过调用createTransferTask的方法创建转储任务。
|
dic.createTransferTask(request);
|
添加转储到MapReduce服务(MRS)的转储任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
|
CreateTransferTaskRequest request = new CreateTransferTaskRequest();
//配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);
//添加MRS转储任务,并设置任务名称
MRSDestinationDescriptorRequest descriptor = new MRSDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);
// 配置MRS集群信息:集群名称和集群ID。可通过弹性大数据服务(简称MRS)控制台创建和查询,集群需为非安全模式
descriptor.setMrsClusterName("mrs_dis");
descriptor.setMrsClusterId("fe69a732-c7d3-4b0f-8cda-ec9eca0cf141");
// 转储MRS通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBucketPath("obs-dis");
descriptor.setFilePrefix("transfertask");
// 转储周期,单位s
descriptor.setDeliverTimeInterval(900);
// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");
// 转储OBS的目标文件格式:默认text,可配置parquet、carbon
descriptor.setDestinationFileType(DestinationFileTypeEnum.TEXT.getType());
// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());
request.setMrsDestinationDescriptor(descriptor);
|
配置“CreateTransferTaskRequest”对象之后,通过调用createTransferTask的方法创建转储任务。
|
dic.createTransferTask(request);
|
添加转储到数据湖探索服务(DLI)的转储任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
|
CreateTransferTaskRequest request = new CreateTransferTaskRequest();
// 配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);
// 添加DLI转储任务,并设置任务名称
UqueryDestinationDescriptorRequest descriptor = new UqueryDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);
// 配置DLI相关信息:数据库和内表名称。可通过数据湖探索(简称DLI)控制台创建和查询,DLI表需为内表
descriptor.setDliDatabaseName("dis_dli");
descriptor.setDliTableName("dis_test");
// 转储DLI通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBucketPath("obs-dis");
descriptor.setFilePrefix("transfertask");
// 转储周期,单位s
descriptor.setDeliverTimeInterval(900);
// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");
// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());
request.setDliDestinationDescriptor(descriptor);
|
配置“CreateTransferTaskRequest”对象之后,通过调用createTransferTask的方法创建转储任务。
|
dic.createTransferTask(request);
|
添加转储到数据仓库服务(DWS)的转储任务
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
|
CreateTransferTaskRequest request = new CreateTransferTaskRequest();
//配置通道名称:用户在数据接入服务(简称DIS)控制台创建通道
request.setStreamName(streamName);
//添加DWS转储任务,并设置任务名称
DwsDestinationDescriptorRequest descriptor = new DwsDestinationDescriptorRequest();
descriptor.setTransferTaskName(taskName);
// 配置DWS集群信息:集群名称、集群ID、数据库等信息。可通过数据仓库服务(简称DWS)控制台创建和查询集群,并通过客户端或其他方式创建数据表
descriptor.setDwsClusterName("dis_test");
descriptor.setDwsClusterId("92f90f6a-de4d-4689-82f6-320c328b0062");
descriptor.setDwsDatabaseName("postgres");
descriptor.setDwsSchema("dbadmin");
descriptor.setDwsTableName("distable01");
descriptor.setDwsDelimiter("|");
descriptor.setUserName(System.getenv("DB_ADMIN"));
descriptor.setUserPassword(System.getenv("DB_PASSWORD"));
//DIS调用KMS服务加密存储DWS的密码,保证用户数据安全:用户可通过数据加密服务(简称KMS)控制台的"密钥管理"创建和查询KMS密钥信息
descriptor.setKmsUserKeyName("qiyinshan");
descriptor.setKmsUserKeyId("9521c600-64a8-4971-ad36-7bbfa6d00c41");
// 转储DWS通过OBS服务中转,需配置OBS桶名和子文件夹名,此目录也用于保存转储失败的源数据文件。可通过OBS控制台或客户端创建桶和文件夹
descriptor.setObsBucketPath("obs-dis");
descriptor.setFilePrefix("transfertask");
// 转储周期,单位s
descriptor.setDeliverTimeInterval(900);
// 可选:在DIS管理页面自动创建dis_admin_agency委托后,默认采用此委托。如未创建过IAM委托,请用主账户登录DIS控制台并创建通道,点击“添加转储任务”,前往授权。
descriptor.setAgencyName("dis_admin_agency");
// 设置从DIS通道拉取数据时的初始偏移量: 默认LATEST,从通道内最新上传的记录开始读取; TRIM_HORIZON,从通道内最早的未过期记录开始读取
descriptor.setConsumerStrategy(PartitionCursorTypeEnum.LATEST.name());
request.setDwsDestinationDescriptor(descriptor);
|
配置“CreateTransferTaskRequest”对象之后,通过对客户端调用createTransferTask的方法创建转储任务。
|
dic.createTransferTask(request);
|