下载流式数据
背景信息
下载流式数据,需要确定从分区的什么位置开始获取(即获取游标)。确定起始位置后,再循环获取数据。
获取游标有如下五种方式:
- AT_SEQUENCE_NUMBER
- AFTER_SEQUENCE_NUMBER
- TRIM_HORIZON
- LATEST
- AT_TIMESTAMP
为更好理解游标类型,您需要了解如下几个基本概念。
- 序列号(sequenceNumber),每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecord操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。
- 每个分区的sequenceNumber从0开始持续增长,每条数据对应唯一的sequenceNumber,超过生命周期后此sequenceNumber将过期不可用。(例如上传一条数据到新分区,其sequenceNumber起始为0,上传100条之后,则最后一条的sequenceNumber为99;如超过生命周期之后,0~99的数据则不可用)
- 分区的数据有效范围可以通过调用describeStream(查询通道详情)接口获取,其sequenceNumberRange代表数据有效范围,第一个值为最老数据的sequenceNumber,最后一个值为下一条上传数据的sequenceNumber(最新数据的sequenceNumber为此值-1)
例如[100, 200],表示此分区总共上传了200条数据,其中第0~99条已过期,有效的最老数据为100,最新数据为199,下一条上传数据的sequenceNumber为200。
场景说明
下表介绍5种下载数据方式的适用场景,您可依据自己的需求进行适配。
游标类型(CursorType) |
说明 |
适用场景 |
备注 |
---|---|---|---|
AT_SEQUENCE_NUMBER |
从特定序列号(即demo中starting-sequence-number定义的序列号)所在的记录开始读取数据。此类型为默认游标类型。 |
适用于有明确的起始sequenceNumber场景,例如已知需要从哪一条数据开始消费。 |
与序列号(sequenceNumber)和分区数据有效范围sequenceNumberRange【A,B】强相关。 指定的sequenceNumber应满足如下条件: A<=sequenceNumber<=B |
AFTER_SEQUENCE_NUMBER |
从特定序列号(即demo中starting-sequence-number定义的序列号)后的记录开始读取数据。 |
适用于保存了上次消费位置的场景,例如每次消费都保存位置(记录到文件或checkpoint),若程序重启则可以从保存的位置之后开始恢复,此时用AT_SEQUENCE_NUMBER则会重复一条数据。 |
与序列号(sequenceNumber)和分区数据有效范围sequenceNumberRange【A,B】强相关。 指定的sequenceNumber应满足如下条件: (A-1)<=sequenceNumber<=(B-1) |
TRIM_HORIZON |
从分区最老的数据开始消费,即读取分区内所有有效数据。 例如分区数据有效范围为[100, 200], 则会从100开始消费。 |
适用于不知道消费位置,则直接消费分区内所有有效数据的场景。 |
无 |
LATEST |
从分区最新的数据之后开始消费,即不读取分区内的已有数据,而是从下一条上传的数据开始。 (如分区数据有效范围为[100, 200], 则会从200开始消费,如此时无数据上传,则获取的数据为空;如有数据上传,就会得到200,201,202,...) |
适用于不知道消费位置,则丢弃分区已有的数据,从新上传的数据开始消费的场景。 |
无 |
AT_TIMESTAMP |
指定一个时间戳,会从此时间戳上传的数据开始读取,要求在获取游标的时候,有一个明确的13位时间戳。 (例如指定1541742263206,表示读取从2018-11-09 13:44:23开始上传的数据) |
适用于不知道消费位置,但想从指定的时间或者从已知上次消费的停止时间开始消费的场景 |
|
样例代码
使用初始化DIS客户端初始化后的客户端实例通过DIS通道获取数据。
其中,“streamName”的配置值要与开通DIS通道中“通道名称”的值一致,“endpoint”,“ak”,“sk”,“region”,“projectId”信息请参见获取认证信息。
- 下载数据方式选择AT_SEQUENCE_NUMBER和AFTER_SEQUENCE_NUMBER时,样例代码示例如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 | //初始化DIS客户端实例
DIS dic = DISClientBuilder.standard()
.withEndpoint("xxxx")
.withAk("xxxx")
.withSk("xxxx")
.withProjectId("xxxx")
.withRegion("xxxx")
.build();
// 配置通道名称
String streamName = "streamName";
// 配置数据下载分区ID
String partitionId = "shardId-0000000000";
// 配置下载数据序列号
String startingSequenceNumber = "0";
// 配置下载数据方式
// AT_SEQUENCE_NUMBER: 从指定的sequenceNumber开始获取,需要设置StartingSequenceNumber
// AFTER_SEQUENCE_NUMBER: 从指定的sequenceNumber之后开始获取,需要设置StartingSequenceNumber
String cursorType = PartitionCursorTypeEnum.AT_SEQUENCE_NUMBER.name();
try
{
// 获取数据游标
GetPartitionCursorRequest request = new GetPartitionCursorRequest();
request.setStreamName(streamName);
request.setPartitionId(partitionId);
request.setCursorType(cursorType);
request.setStartingSequenceNumber(startingSequenceNumber);
GetPartitionCursorResult response = dic.getPartitionCursor(request);
String cursor = response.getPartitionCursor();
LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
GetRecordsRequest recordsRequest = new GetRecordsRequest();
GetRecordsResult recordResponse = null;
while (true)
{
recordsRequest.setPartitionCursor(cursor);
recordResponse = dic.getRecords(recordsRequest);
// 下一批数据游标
cursor = recordResponse.getNextPartitionCursor();
for (Record record : recordResponse.getRecords())
{
LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].",
new String(record.getData().array()),
record.getPartitionKey(),
record.getSequenceNumber());
}
}
}
catch (DISClientException e)
{
LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
e.getMessage(),
e);
}
catch (Exception e)
{
LOGGER.error(e.getMessage(), e);
}
}
}
|
- 下载数据方式选择TRIM_HORIZON和 LATEST时,样例代码示例如下,参见加粗代码行,基于demo注释掉startingSequenceNumber字段。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 | //初始化DIS客户端实例
DIS dic = DISClientBuilder.standard()
.withEndpoint("xxxx")
.withAk("xxxx")
.withSk("xxxx")
.withProjectId("xxxx")
.withRegion("xxxx")
.build();
// 配置通道名称
String streamName = "streamName";
// 配置数据下载分区ID
String partitionId = "shardId-0000000000";
// 配置下载数据序列号
//String startingSequenceNumber = "0";
// 配置下载数据方式
// TRIM_HORIZON: 从最早被存储至分区的有效记录开始读取。
//LATEST:从分区中的最新记录开始读取,此设置可以保证你总是读到分区中最新记录。
String cursorType = PartitionCursorTypeEnum.TRIM_HORIZON.name();
try
{
// 获取数据游标
GetPartitionCursorRequest request = new GetPartitionCursorRequest();
request.setStreamName(streamName);
request.setPartitionId(partitionId);
request.setCursorType(cursorType);
//request.setStartingSequenceNumber(startingSequenceNumber);
GetPartitionCursorResult response = dic.getPartitionCursor(request);
String cursor = response.getPartitionCursor();
LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
GetRecordsRequest recordsRequest = new GetRecordsRequest();
GetRecordsResult recordResponse = null;
while (true)
{
recordsRequest.setPartitionCursor(cursor);
recordsRequest.setLimit(limit);
recordResponse = dic.getRecords(recordsRequest);
// 下一批数据游标
cursor = recordResponse.getNextPartitionCursor();
for (Record record : recordResponse.getRecords())
{
LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].",
new String(record.getData().array()),
record.getPartitionKey(),
record.getSequenceNumber());
}
if (recordResponse.getRecords().size() == 0)
{
Thread.sleep(1000);
}
}
}
catch (DISClientException e)
{
LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
e.getMessage(),
e);
}
catch (Exception e)
{
LOGGER.error(e.getMessage(), e);
}
}
}
|
- 下载数据方式选择TAT_TIMESTAMP时,样例代码示例如下,基于demo增加timestamp字段,添加如下加粗行代码。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 | //初始化DIS客户端实例,其中,“endpoint”,“ak”,“sk”,“region”,“projectId”信息请参见。
DIS dic = DISClientBuilder.standard()
.withEndpoint("xxxx")
.withAk("xxxx")
.withSk("xxxx")
.withProjectId("xxxx")
.withRegion("xxxx")
.build();
// 配置通道名称
String streamName = "streamName";
// 配置数据下载分区ID
String partitionId = "shardId-0000000000";
// 配置下载数据序列号
//String startingSequenceNumber = "0";
//配置时间戳
long timestamp = 1542960693804L;
// 配置下载数据方式
// AT_TIMESTAMP: 从特定时间戳(即timestamp定义的时间戳)开始读取。
String cursorType = PartitionCursorTypeEnum.AT_TIMESTAMP.name();
try
{
// 获取数据游标
GetPartitionCursorRequest request = new GetPartitionCursorRequest();
request.setStreamName(streamName);
request.setPartitionId(partitionId);
request.setCursorType(cursorType);
//request.setStartingSequenceNumber(startingSequenceNumber);
request.setTimestamp(timestamp);
GetPartitionCursorResult response = dic.getPartitionCursor(request);
String cursor = response.getPartitionCursor();
LOGGER.info("Get stream {}[partitionId={}] cursor success : {}", streamName, partitionId, cursor);
GetRecordsRequest recordsRequest = new GetRecordsRequest();
GetRecordsResult recordResponse = null;
while (true)
{
recordsRequest.setPartitionCursor(cursor);
recordsRequest.setLimit(limit);
recordResponse = dic.getRecords(recordsRequest);
// 下一批数据游标
cursor = recordResponse.getNextPartitionCursor();
for (Record record : recordResponse.getRecords())
{
LOGGER.info("Get record [{}], partitionKey [{}], sequenceNumber [{}].",
new String(record.getData().array()),
record.getPartitionKey(),
record.getSequenceNumber());
}
if (recordResponse.getRecords().size() == 0)
{
Thread.sleep(1000);
}
}
}
catch (DISClientException e)
{
LOGGER.error("Failed to get a normal response, please check params and retry. Error message [{}]",
e.getMessage(),
e);
}
catch (Exception e)
{
LOGGER.error(e.getMessage(), e);
}
}
}
|
参数说明
参数名 |
参数类型 |
说明 |
---|---|---|
partitionId |
String |
分区ID。 说明:
请根据上传流式数据的执行结果,控制台的返回信息字段,例如 “partitionId [shardId-0000000000]”进行定义。 |
startingSequenceNumber |
String |
序列号。序列号是每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecords操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。 说明:
请根据上传流式数据的执行结果,控制台的返回信息字段,例如“sequenceNumber [1]”进行定义。 |
cursorType |
String |
游标类型。
|
运行程序
右键选择
运行程序,若程序运行成功,可以在控制台查看到类似如下信息:1 2 3 4 5 6 7 | 14:55:42.954 [main] INFOcom.bigdata.dis.sdk.DISConfig - get from classLoader
14:55:44.103 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - get from classLoader
14:55:44.105 [main] INFOcom.bigdata.dis.sdk.util.config.ConfigurationUtils - propertyMapFromFile size : 2
14:55:45.235 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get stream streamName[partitionId=0] cursor success : eyJnZXRJdGVyYXRvclBhcmFtIjp7InN0cmVhbS1uYW1lIjoiZGlzLTEzbW9uZXkiLCJwYXJ0aXRpb24taWQiOiIwIiwiY3Vyc29yLXR5cGUiOiJBVF9TRVFVRU5DRV9OVU1CRVIiLCJzdGFydGluZy1zZXF1ZW5jZS1udW1iZXIiOiIxMDY4OTcyIn0sImdlbmVyYXRlVGltZXN0YW1wIjoxNTEzNjY2NjMxMTYxfQ
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [964885], sequenceNumber [0].
14:55:45.305 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [910960], sequenceNumber [1].
14:55:46.359 [main] INFOcom.bigdata.dis.sdk.demo.ConsumerDemo - Get Record [hello world.], partitionKey [528377], sequenceNumber [2].
|
参数名 |
参数类型 |
说明 |
---|---|---|
partition_key |
String |
用户上传数据时设置的partition_key。 说明:
上传数据时,如果传了partition_key参数,则下载数据时可返回此参数。如果上传数据时,未传partition_key参数,而是传入partition_id,则不返回partition_key。 |
startingSequenceNumber |
String |
序列号。序列号是每个记录的唯一标识符。序列号由DIS在数据生产者调用PutRecords操作以添加数据到DIS数据通道时DIS服务自动分配的。同一分区键的序列号通常会随时间变化增加。PutRecords请求之间的时间段越长,序列号越大。 |
