下载流式数据
背景信息
下载流式数据,需要确定从分区的什么位置开始获取(即获取游标)。确定起始位置后,再循环获取数据。
获取游标有如下五种方式:
- AT_SEQUENCE_NUMBER
- AFTER_SEQUENCE_NUMBER
- TRIM_HORIZON
- LATEST
- AT_TIMESTAMP
为更好理解游标类型,您需要了解如下几个基本概念。
- 序列号(sequenceNumber),每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecord操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。
- 每个分区的sequenceNumber从0开始持续增长,每条数据对应唯一的sequenceNumber,超过生命周期后此sequenceNumber将过期不可用。(例如上传一条数据到新分区,其sequenceNumber起始为0,上传100条之后,则最后一条的sequenceNumber为99;如超过生命周期之后,0~99的数据则不可用)
- 分区的数据有效范围可以通过调用describeStream(查询通道详情)接口获取,其sequenceNumberRange代表数据有效范围,第一个值为最老数据的sequenceNumber,最后一个值为下一条上传数据的sequenceNumber(最新数据的sequenceNumber为此值-1)
例如[100, 200],表示此分区总共上传了200条数据,其中第0~99条已过期,有效的最老数据为100,最新数据为199,下一条上传数据的sequenceNumber为200。
场景说明
下表介绍5种下载数据方式的适用场景,您可依据自己的需求进行适配。
游标类型(CursorType) |
说明 |
适用场景 |
备注 |
---|---|---|---|
AT_SEQUENCE_NUMBER |
从特定序列号(即demo中starting-sequence-number定义的序列号)所在的记录开始读取数据。此类型为默认游标类型。 |
适用于有明确的起始sequenceNumber场景,例如已知需要从哪一条数据开始消费。 |
与序列号(sequenceNumber)和分区数据有效范围sequenceNumberRange【A,B】强相关。 指定的sequenceNumber应满足如下条件: A<=sequenceNumber<=B |
AFTER_SEQUENCE_NUMBER |
从特定序列号(即demo中starting-sequence-number定义的序列号)后的记录开始读取数据。 |
适用于保存了上次消费位置的场景,例如每次消费都保存位置(记录到文件或checkpoint),若程序重启则可以从保存的位置之后开始恢复,此时用AT_SEQUENCE_NUMBER则会重复一条数据。 |
与序列号(sequenceNumber)和分区数据有效范围sequenceNumberRange【A,B】强相关。 指定的sequenceNumber应满足如下条件: (A-1)<=sequenceNumber<=(B-1) |
TRIM_HORIZON |
从分区最老的数据开始消费,即读取分区内所有有效数据。 例如分区数据有效范围为[100, 200], 则会从100开始消费。 |
适用于不知道消费位置,则直接消费分区内所有有效数据的场景。 |
无 |
LATEST |
从分区最新的数据之后开始消费,即不读取分区内的已有数据,而是从下一条上传的数据开始。 (如分区数据有效范围为[100, 200], 则会从200开始消费,如此时无数据上传,则获取的数据为空;如有数据上传,就会得到200,201,202,...) |
适用于不知道消费位置,则丢弃分区已有的数据,从新上传的数据开始消费的场景。 |
无 |
AT_TIMESTAMP |
指定一个时间戳,会从此时间戳上传的数据开始读取,要求在获取游标的时候,有一个明确的13位时间戳。 (例如指定1541742263206,表示读取从2018-11-09 13:44:23开始上传的数据) |
适用于不知道消费位置,但想从指定的时间或者从已知上次消费的停止时间开始消费的场景 |
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 |
//初始化DIS客户端实例 DIS dic = DISClientBuilder.standard() .withEndpoint("xxxx") // 认证用的ak和sk硬编码到代码中或者明文存储都有很大的安全风险,建议在配置文件或者环境变量中密文存放,使用时解密,确保安全; // 本示例以ak和sk保存在环境变量中来实现身份验证为例,运行本示例前请先在本地环境中设置环境变量HUAWEICLOUD_SDK_AK和HUAWEICLOUD_SDK_SK。 .withAk(System.getenv("HUAWEICLOUD_SDK_AK")) .withSk(System.getenv("HUAWEICLOUD_SDK_SK")) .withProjectId("xxxx") .withRegion("xxxx") .build(); // 配置通道名称 String streamName = "streamName"; // 配置数据下载分区ID String partitionId = "shardId-0000000000"; // 配置下载数据序列号 String startingSequenceNumber = "0"; // 配置下载数据方式 // AT_SEQUENCE_NUMBER: 从指定的sequenceNumber开始获取,需要设置StartingSequenceNumber // AFTER_SEQUENCE_NUMBER: 从指定的sequenceNumber之后开始获取,需要设置StartingSequenceNumber String cursorType = PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name(); try { // 获取数据游标 GetPartitionCursorRequest request = new GetPartitionCursorRequest(); request.setStreamName(streamName); request.setPartitionId(partitionId); request.setCursorType(cursorType); request.setStartingSequenceNumber(startingSequenceNumber); GetPartitionCursorResult response = dic.getPartitionCursor(request); String cursor = response.getPartitionCursor(); LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor); GetRecordsRequest recordsRequest = new GetRecordsRequest(); GetRecordsResult recordResponse = null; while (true) { recordsRequest.setPartitionCursor(cursor); recordResponse = dic.getRecords(recordsRequest); // 下一批数据游标 cursor = recordResponse.getNextPartitionCursor(); for (Record record : recordResponse.getRecords()) { LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].", new String(record.getData().array()), record.getPartitionKey(), record.getSequenceNumber()); } } } catch (DISClientException e) { LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } } } |
- 下载数据方式选择TRIM_HORIZON和 LATEST时,样例代码示例如下,参见加粗代码行,基于demo注释掉startingSequenceNumber字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 |
//初始化DIS客户端实例 DIS dic = DISClientBuilder.standard() .withEndpoint("xxxx") .withAk(System.getenv("HUAWEICLOUD_SDK_AK")) .withSk(System.getenv("HUAWEICLOUD_SDK_SK")) .withProjectId("xxxx") .withRegion("xxxx") .build(); // 配置通道名称 String streamName = "streamName"; // 配置数据下载分区ID String partitionId = "shardId-0000000000"; // 配置下载数据序列号 //String startingSequenceNumber = "0"; // 配置下载数据方式 // TRIM_HORIZON: 从最早被存储至分区的有效记录开始读取。 //LATEST:从分区中的最新记录开始读取,此设置可以保证你总是读到分区中最新记录。 String cursorType = PartitionCursorTypeEnum.TRIM_HORIZON.name(); try { // 获取数据游标 GetPartitionCursorRequest request = new GetPartitionCursorRequest(); request.setStreamName(streamName); request.setPartitionId(partitionId); request.setCursorType(cursorType); //request.setStartingSequenceNumber(startingSequenceNumber); GetPartitionCursorResult response = dic.getPartitionCursor(request); String cursor = response.getPartitionCursor(); LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor); GetRecordsRequest recordsRequest = new GetRecordsRequest(); GetRecordsResult recordResponse = null; while (true) { recordsRequest.setPartitionCursor(cursor); recordsRequest.setLimit(limit); recordResponse = dic.getRecords(recordsRequest); // 下一批数据游标 cursor = recordResponse.getNextPartitionCursor(); for (Record record : recordResponse.getRecords()) { LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].", new String(record.getData().array()), record.getPartitionKey(), record.getSequenceNumber()); } if (recordResponse.getRecords().size() == 0) { Thread.sleep(1000); } } } catch (DISClientException e) { LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } } } |
- 下载数据方式选择TAT_TIMESTAMP时,样例代码示例如下,基于demo增加timestamp字段,添加如下加粗行代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 |
//初始化DIS客户端实例,其中,“endpoint”,“ak”,“sk”,“region”,“projectId”信息请参见。 DIS dic = DISClientBuilder.standard() .withEndpoint("xxxx") .withAk(System.getenv("HUAWEICLOUD_SDK_AK")) .withSk(System.getenv("HUAWEICLOUD_SDK_SK")) .withProjectId("xxxx") .withRegion("xxxx") .build(); // 配置通道名称 String streamName = "streamName"; // 配置数据下载分区ID String partitionId = "shardId-0000000000"; // 配置下载数据序列号 //String startingSequenceNumber = "0"; //配置时间戳 long timestamp = 1542960693804L; // 配置下载数据方式 // AT_TIMESTAMP: 从特定时间戳(即timestamp定义的时间戳)开始读取。 String cursorType = PartitionCursorTypeEnum.AT_TIMESTAMP.name(); try { // 获取数据游标 GetPartitionCursorRequest request = new GetPartitionCursorRequest(); request.setStreamName(streamName); request.setPartitionId(partitionId); request.setCursorType(cursorType); //request.setStartingSequenceNumber(startingSequenceNumber); request.setTimestamp(timestamp); GetPartitionCursorResult response = dic.getPartitionCursor(request); String cursor = response.getPartitionCursor(); LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor); GetRecordsRequest recordsRequest = new GetRecordsRequest(); GetRecordsResult recordResponse = null; while (true) { recordsRequest.setPartitionCursor(cursor); recordsRequest.setLimit(limit); recordResponse = dic.getRecords(recordsRequest); // 下一批数据游标 cursor = recordResponse.getNextPartitionCursor(); for (Record record : recordResponse.getRecords()) { LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].", new String(record.getData().array()), record.getPartitionKey(), record.getSequenceNumber()); } if (recordResponse.getRecords().size() == 0) { Thread.sleep(1000); } } } catch (DISClientException e) { LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]", e.getMessage(), e); } catch (Exception e) { LOGGER.error(e.getMessage(), e); } } } |
参数说明
参数名 |
参数类型 |
说明 |
---|---|---|
partitionId |
String |
分区ID。
说明:
请根据上传流式数据的执行结果,控制台的返回信息字段,例如 “partitionId [shardId-0000000000]”进行定义。 |
startingSequenceNumber |
String |
序列号。序列号是每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecords操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。
说明:
请根据上传流式数据的执行结果,控制台的返回信息字段,例如“sequenceNumber [1]”进行定义。 |
cursorType |
String |
游标类型。
|
运行程序
右键选择
运行程序,若程序运行成功,可以在控制台查看到类似如下信息:
1 2 3 4 5 6 7 |
14:55:42.954 [main] INFOcom.bigdata.dis.sdk.DISConfig - get from classLoader 14:55:44.103 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader 14:55:44.105 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2 14:55:45.235 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get stream streamName[partitionId=0] cursor success : eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLTEzbW9uZXkiLCJwYXJ0aXRpb24taWQiOiIwIiwiY3Vyc29yLXR5cGUiOiJBVF9TRVFVRU5DRV9OVU1CRVIiLCJzdGFydGluZy1zZXF1ZW5jZS1udW1iZXIiOiIxMDY4OTcyIn0sImdlbmVyYXRlVGltZXN0YW1wIjoxNTEzNjY2NjMxMTYxfQ 14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [964885], sequenceNumber [0]. 14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [910960], sequenceNumber [1]. 14:55:46.359 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [528377], sequenceNumber [2]. |
参数名 |
参数类型 |
说明 |
---|---|---|
partition_key |
String |
用户上传数据时设置的partition_key。
说明:
上传数据时,如果传了partition_key参数,则下载数据时可返回此参数。如果上传数据时,未传partition_key参数,而是传入partition_id,则不返回partition_key。 |
startingSequenceNumber |
String |
序列号。序列号是每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecords操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。 |