更新时间:2024-10-25 GMT+08:00

下载流式数据

背景信息

下载流式数据,需要确定从分区的什么位置开始获取(即获取游标)。确定起始位置后,再循环获取数据。

获取游标有如下五种方式:

  • AT_SEQUENCE_NUMBER
  • AFTER_SEQUENCE_NUMBER
  • TRIM_HORIZON
  • LATEST
  • AT_TIMESTAMP

为更好理解游标类型,您需要了解如下几个基本概念。

  • 序列号(sequenceNumber),每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecord操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。
  • 每个分区的sequenceNumber从0开始持续增长,每条数据对应唯一的sequenceNumber,超过生命周期后此sequenceNumber将过期不可用。(例如上传一条数据到新分区,其sequenceNumber起始为0,上传100条之后,则最后一条的sequenceNumber为99;如超过生命周期之后,0~99的数据则不可用)
  • 分区的数据有效范围可以通过调用describeStream(查询通道详情)接口获取,其sequenceNumberRange代表数据有效范围,第一个值为最老数据的sequenceNumber,最后一个值为下一条上传数据的sequenceNumber(最新数据的sequenceNumber为此值-1)

    例如[100, 200],表示此分区总共上传了200条数据,其中第0~99条已过期,有效的最老数据为100,最新数据为199,下一条上传数据的sequenceNumber为200。

场景说明

下表介绍5种下载数据方式的适用场景,您可依据自己的需求进行适配。

表1 场景说明

游标类型(CursorType)

说明

适用场景

备注

AT_SEQUENCE_NUMBER

从特定序列号(即demo中starting-sequence-number定义的序列号)所在的记录开始读取数据。此类型为默认游标类型。

适用于有明确的起始sequenceNumber场景,例如已知需要从哪一条数据开始消费。

与序列号(sequenceNumber)和分区数据有效范围sequenceNumberRange【A,B】强相关。

指定的sequenceNumber应满足如下条件:

A<=sequenceNumber<=B

AFTER_SEQUENCE_NUMBER

从特定序列号(即demo中starting-sequence-number定义的序列号)后的记录开始读取数据。

适用于保存了上次消费位置的场景,例如每次消费都保存位置(记录到文件或checkpoint),若程序重启则可以从保存的位置之后开始恢复,此时用AT_SEQUENCE_NUMBER则会重复一条数据。

与序列号(sequenceNumber)和分区数据有效范围sequenceNumberRange【A,B】强相关。

指定的sequenceNumber应满足如下条件:

(A-1)<=sequenceNumber<=(B-1)

TRIM_HORIZON

从分区最老的数据开始消费,即读取分区内所有有效数据。

例如分区数据有效范围为[100, 200], 则会从100开始消费。

适用于不知道消费位置,则直接消费分区内所有有效数据的场景。

LATEST

从分区最新的数据之后开始消费,即不读取分区内的已有数据,而是从下一条上传的数据开始。

(如分区数据有效范围为[100, 200], 则会从200开始消费,如此时无数据上传,则获取的数据为空;如有数据上传,就会得到200,201,202,...)

适用于不知道消费位置,则丢弃分区已有的数据,从新上传的数据开始消费的场景。

AT_TIMESTAMP

指定一个时间戳,会从此时间戳上传的数据开始读取,要求在获取游标的时候,有一个明确的13位时间戳。

(例如指定1541742263206,表示读取从2018-11-09 13:44:23开始上传的数据)

适用于不知道消费位置,但想从指定的时间或者从已知上次消费的停止时间开始消费的场景

  • 若最老一条数据的上传时间为C,则timestamp>=c即可
  • 若timestamp大于最新一条数据的时间戳或者是未来时间,则从最新一条数据之后开始读取。

样例代码

使用初始化DIS客户端初始化后的客户端实例通过DIS通道获取数据。

  • 下载数据方式选择AT_SEQUENCE_NUMBER和AFTER_SEQUENCE_NUMBER时,样例代码示例如下:
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
//初始化DIS客户端实例
DIS dic = DISClientBuilder.standard()
            .withEndpoint("xxxx")
            // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全;
            // 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK。 
            .withAk(System.getenv("HUAWEICLOUD_SDK_AK"))
            .withSk(System.getenv("HUAWEICLOUD_SDK_SK"))
            .withProjectId("xxxx")
            .withRegion("xxxx")
            .build();
// 配置通道名称  
String streamName = "streamName";    
// 配置数据下载分区ID  
String partitionId = "shardId-0000000000";    
// 配置下载数据序列号  
String startingSequenceNumber = "0";    
// 配置下载数据方式
// AT_SEQUENCE_NUMBER: 从指定的sequenceNumber开始获取,需要设置StartingSequenceNumber
// AFTER_SEQUENCE_NUMBER: 从指定的sequenceNumber之后开始获取,需要设置StartingSequenceNumber
String cursorType = PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name();
 
try 
{
      // 获取数据游标
      GetPartitionCursorRequest request = new GetPartitionCursorRequest();
      request.setStreamName(streamName);
      request.setPartitionId(partitionId);
      request.setCursorType(cursorType);
      request.setStartingSequenceNumber(startingSequenceNumber);
      GetPartitionCursorResult response = dic.getPartitionCursor(request);
      String cursor = response.getPartitionCursor();
      LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
      GetRecordsRequest recordsRequest = new GetRecordsRequest();
      GetRecordsResult recordResponse = null;
      while (true)
            {
                recordsRequest.setPartitionCursor(cursor);
                recordResponse = dic.getRecords(recordsRequest);
                // 下一批数据游标
                cursor = recordResponse.getNextPartitionCursor();

                for (Record record : recordResponse.getRecords())
                {
                    LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].",
                        new String(record.getData().array()),
                        record.getPartitionKey(),
                        record.getSequenceNumber());
                }

                
            }
        }
    catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(),
                e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }
    }
}
  • 下载数据方式选择TRIM_HORIZON和 LATEST时,样例代码示例如下,参见加粗代码行,基于demo注释掉startingSequenceNumber字段。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
//初始化DIS客户端实例
DIS dic = DISClientBuilder.standard()
            .withEndpoint("xxxx")
            .withAk(System.getenv("HUAWEICLOUD_SDK_AK"))
            .withSk(System.getenv("HUAWEICLOUD_SDK_SK"))
            .withProjectId("xxxx")
            .withRegion("xxxx")
            .build();
// 配置通道名称  
String streamName = "streamName";    
// 配置数据下载分区ID  
String partitionId = "shardId-0000000000";    
// 配置下载数据序列号  
//String startingSequenceNumber = "0";    
// 配置下载数据方式
// TRIM_HORIZON: 从最早被存储至分区的有效记录开始读取。
//LATEST:从分区中的最新记录开始读取,此设置可以保证你总是读到分区中最新记录。 
String cursorType = PartitionCursorTypeEnum.TRIM_HORIZON.name();
 
try 
{
      // 获取数据游标
      GetPartitionCursorRequest request = new GetPartitionCursorRequest();
      request.setStreamName(streamName);
      request.setPartitionId(partitionId);
      request.setCursorType(cursorType);
      //request.setStartingSequenceNumber(startingSequenceNumber);
      GetPartitionCursorResult response = dic.getPartitionCursor(request);
      String cursor = response.getPartitionCursor();
      LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
      GetRecordsRequest recordsRequest = new GetRecordsRequest();
      GetRecordsResult recordResponse = null;
      while (true)
       {
                recordsRequest.setPartitionCursor(cursor);
                recordsRequest.setLimit(limit);
                recordResponse = dic.getRecords(recordsRequest);
                // 下一批数据游标
                cursor = recordResponse.getNextPartitionCursor();

                for (Record record : recordResponse.getRecords())
                {
                    LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].",
                        new String(record.getData().array()),
                        record.getPartitionKey(),
                        record.getSequenceNumber());
                }

                if (recordResponse.getRecords().size() == 0)
                {
                    Thread.sleep(1000);
                }
            }
        }
        catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(),
                e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }
    }
}
  • 下载数据方式选择TAT_TIMESTAMP时,样例代码示例如下,基于demo增加timestamp字段,添加如下加粗行代码。
 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
//初始化DIS客户端实例,其中,“endpoint”,“ak”,“sk”,“region”,“projectId”信息请参见。
DIS dic = DISClientBuilder.standard()
            .withEndpoint("xxxx")
            .withAk(System.getenv("HUAWEICLOUD_SDK_AK"))
            .withSk(System.getenv("HUAWEICLOUD_SDK_SK"))
            .withProjectId("xxxx")
            .withRegion("xxxx")
            .build();
// 配置通道名称  
String streamName = "streamName";    
// 配置数据下载分区ID  
String partitionId = "shardId-0000000000";    
// 配置下载数据序列号  
//String startingSequenceNumber = "0";  
//配置时间戳
long timestamp = 1542960693804L;  
// 配置下载数据方式
// AT_TIMESTAMP: 从特定时间戳(即timestamp定义的时间戳)开始读取。
String cursorType = PartitionCursorTypeEnum.AT_TIMESTAMP.name();
 
try 
{
      // 获取数据游标
      GetPartitionCursorRequest request = new GetPartitionCursorRequest();
      request.setStreamName(streamName);
      request.setPartitionId(partitionId);
      request.setCursorType(cursorType);
      //request.setStartingSequenceNumber(startingSequenceNumber);
      request.setTimestamp(timestamp);
      GetPartitionCursorResult response = dic.getPartitionCursor(request);
      String cursor = response.getPartitionCursor();
      LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);

            GetRecordsRequest recordsRequest = new GetRecordsRequest();
            GetRecordsResult recordResponse = null;
         while (true)
            {
                recordsRequest.setPartitionCursor(cursor);
                recordsRequest.setLimit(limit);
                recordResponse = dic.getRecords(recordsRequest);
                // 下一批数据游标
                cursor = recordResponse.getNextPartitionCursor();

                for (Record record : recordResponse.getRecords())
                {
                    LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].",
                        new String(record.getData().array()),
                        record.getPartitionKey(),
                        record.getSequenceNumber());
                }

                if (recordResponse.getRecords().size() == 0)
                {
                    Thread.sleep(1000);
                }
            }
        }
        catch (DISClientException e)
        {
            LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
                e.getMessage(),
                e);
        }
        catch (Exception e)
        {
            LOGGER.error(e.getMessage(), e);
        }
    }
}

参数说明

表2 参数说明

参数名

参数类型

说明

partitionId

String

分区ID。

说明:

请根据上传流式数据的执行结果,控制台的返回信息字段,例如 “partitionId [shardId-0000000000]”进行定义。

startingSequenceNumber

String

序列号。序列号是每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecords操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。

说明:

请根据上传流式数据的执行结果,控制台的返回信息字段,例如“sequenceNumber [1]”进行定义。

cursorType

String

游标类型。

  • AT_SEQUENCE_NUMBER:从特定序列号(即startingSequenceNumber定义的序列号)所在的记录开始读取数据。此类型为默认游标类型。
  • AFTER_SEQUENCE_NUMBER:从特定序列号(即startingSequenceNumber定义的序列号)后的记录开始读取数据。
  • TRIM_HORIZON:从最早被存储至分区的有效记录开始读取。

    例如,某租户使用DIS的通道,分别上传了三条数据A1,A2,A3。N天后(设定A1已过期,A2和A3仍在有效期范围内),该租户需要下载数据,并选择了TRIM_HORIZON这种下载方式。那么用户可下载的数据将从A2开始读取。

  • LATEST:从分区中的最新记录开始读取,此设置可以保证你总是读到分区中最新记录。
  • AT_TIMESTAMP:从特定时间戳(即timestamp定义的时间戳)开始读取。

运行程序

右键选择Run As > 1 Java Application运行程序,若程序运行成功,可以在控制台查看到类似如下信息:

1
2
3
4
5
6
7
14:55:42.954 [main] INFOcom.bigdata.dis.sdk.DISConfig - get from classLoader
14:55:44.103 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader
14:55:44.105 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2
14:55:45.235 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get stream streamName[partitionId=0] cursor success : eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLTEzbW9uZXkiLCJwYXJ0aXRpb24taWQiOiIwIiwiY3Vyc29yLXR5cGUiOiJBVF9TRVFVRU5DRV9OVU1CRVIiLCJzdGFydGluZy1zZXF1ZW5jZS1udW1iZXIiOiIxMDY4OTcyIn0sImdlbmVyYXRlVGltZXN0YW1wIjoxNTEzNjY2NjMxMTYxfQ
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [964885], sequenceNumber [0].
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [910960], sequenceNumber [1].
14:55:46.359 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [528377], sequenceNumber [2]. 
表3 参数说明

参数名

参数类型

说明

partition_key

String

用户上传数据时设置的partition_key。

说明:

上传数据时,如果传了partition_key参数,则下载数据时可返回此参数。如果上传数据时,未传partition_key参数,而是传入partition_id,则不返回partition_key。

startingSequenceNumber

String

序列号。序列号是每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecords操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。